2023-06-18  阅读(340)
原文作者:代码有毒 mrcode 原文地址:https://mrcode.blog.csdn.net/article/details/81502532

异步处理rest服务

  • 使用Callable异步处理rest服务
  • 使用DeferredResult异步处理rest服务
  • 异步处理配置

异步处理就是主线程使用委托副线程去处理业务,然后主线程去接纳其他的请求。提高性能

Callable

202306181906543851.png

    @RestController
    public class AsyncController {
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        @RequestMapping("/order")
        public Callable<String> order() {
            logger.info("主线程开始");
            Callable<String> result = () -> {
                logger.info("副线程开始");
                TimeUnit.SECONDS.sleep(1);
                logger.info("副线程返回");
                return "success";
            };
            logger.info("主线程返回");
            return result;
        }
    }

输出

    2018-08-02 17:43:24.406  INFO 15644 --- [nio-8080-exec-2] c.e.demo.web.async.AsyncController       : 主线程开始
    2018-08-02 17:43:24.407  INFO 15644 --- [nio-8080-exec-2] c.e.demo.web.async.AsyncController       : 主线程返回
    2018-08-02 17:43:24.414  INFO 15644 --- [      MvcAsync1] c.e.demo.web.async.AsyncController       : 副线程开始
    2018-08-02 17:43:25.414  INFO 15644 --- [      MvcAsync1] c.e.demo.web.async.AsyncController       : 副线程返回

很奇怪。这个是spring mvc提供的支持吗?如果是不在spring框架中使用。自己写个测试例子,是在同一个线程中执行的

DeferredResult

在实际场景中可能会非常复杂,假如如下入这样;下单服务的一个流程

202306181906599342.png

线程1和线程2是隔离的。

业务实现逻辑如下:

  1. 请求下单
  2. 发送消息到消息队列中
  3. 另外一个应用(假如是订单系统)处理下单,并返回处理结果

编写下单请求api

    @RestController
    public class AsyncController {
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        @Autowired
        private MockQueue mockQueue;
        @Autowired
        private DeferredResultHolder deferredResultHolder;
    
        @RequestMapping("/order")
        public DeferredResult<String> order() {
            logger.info("主线程开始");
            // 发送消息到消息队列,请求订单生成
            String orderNumber = RandomStringUtils.randomNumeric(8);
            mockQueue.setPlaceOrder(orderNumber);
            DeferredResult<String> deferredResult = new DeferredResult<>();
            // holder 只是用来存储 DeferredResult
            // 方便监听器线程拿到 DeferredResult
            deferredResultHolder.getMap().put(orderNumber, deferredResult);
            logger.info("主线程返回");
            return deferredResult;
        }
    }

编写模拟队列

    @Component
    public class MockQueue {
        private Logger logger = LoggerFactory.getLogger(getClass());
        private String placeOrder; // 请求下单
        private String completeOrder;  // 下单完成
    
        public String getPlaceOrder() {
            return placeOrder;
        }
    
        public void setPlaceOrder(String placeOrder) {
            // 模拟另外一个线程去执行耗时操作
            new Thread(() -> {
                logger.info("接到下单请求");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.completeOrder = "下单请求处理完毕";
                this.placeOrder = placeOrder;
                logger.info(completeOrder + " : " + placeOrder);
            }).start();
        }
    
        public String getCompleteOrder() {
            return completeOrder;
        }
    
        public void setCompleteOrder(String completeOrder) {
            this.completeOrder = completeOrder;
        }
    }

编写一个装载 DeferredResult 的缓存类;用来存储已经产生的DeferredResult

    @Component
    public class DeferredResultHolder {
        private Map<String, DeferredResult<String>> map = new HashMap<>();
    
        public Map<String, DeferredResult<String>> getMap() {
            return map;
        }
    
        public void setMap(Map<String, DeferredResult<String>> map) {
            this.map = map;
        }
    }

启动一个线程专门来监听消息队列处理结果,并且调用 deferredResult 返回结果

    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.TimeUnit;
    
    // ContextRefreshedEvent 当应用程序上下文初始化或刷新时引发的事件。
    @Component
    public class QueueLinstener implements ApplicationListener<ContextRefreshedEvent> {
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        @Autowired
        private MockQueue mockQueue;
        @Autowired
        private DeferredResultHolder deferredResultHolder;
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            // 在线程中执行否则会阻塞监听器回调的线程
            new Thread(() -> {
                while (true) {
                    // 当有订单完成时
                    if (StringUtils.isNotBlank(mockQueue.getCompleteOrder())) {
                        String orderNumber = mockQueue.getPlaceOrder();
                        logger.info("返回订单处理结果:" + orderNumber);
                        deferredResultHolder.getMap().get(orderNumber).setResult("place order success");
                        mockQueue.setCompleteOrder(null);
                        deferredResultHolder.getMap().remove(orderNumber);
                    } else {
                        try {
                            TimeUnit.MILLISECONDS.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        }
    }

访问api输出的结果如下:

    2018-08-02 21:53:45.919  INFO 16944 --- [nio-8080-exec-2] c.e.demo.web.async.AsyncController       : 主线程开始
    2018-08-02 21:53:45.920  INFO 16944 --- [nio-8080-exec-2] c.e.demo.web.async.AsyncController       : 主线程返回
    2018-08-02 21:53:45.921  INFO 16944 --- [       Thread-8] com.example.demo.web.async.MockQueue     : 接到下单请求
    2018-08-02 21:53:46.922  INFO 16944 --- [       Thread-8] com.example.demo.web.async.MockQueue     : 下单请求处理完毕 : 34633453
    2018-08-02 21:53:47.269  INFO 16944 --- [       Thread-2] c.example.demo.web.async.QueueLinstener  : 返回订单处理结果:34633453

可以看到请求线程已经结束了,由其他线程进行处理并返回结果的;
这个套路就如同 socket里面reactor模型,主线程只接收请求,其他线程去执行逻辑;
但是有一个问题就是,容器里面肯定有一个线程接收请求,然后启用子线程去分发处理;然后这里还有搞多线程去处理,这个是什么套路呢?对性能有提高吗?

总结下

DeferredResult 在效果上实现了一个让主线程可以立即返回,但是连接不断开,可以通过DeferredResult设置返回结果来让连接返回并断开的功能

异步功能配置

在WebConfig中重写异步支持配置;从上面的例子中也看到了,主线程返回了。所以同步拦截器可能就不那么好用了;可以在这里配置

    public class WebConfig implements WebMvcConfigurer {
        @Autowired
        private TimeInterceptor timeInterceptor;
    
        @Override
        public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
            // 异步支持
    //        configurer.registerCallableInterceptors(); // callable 拦截器
    //        configurer.registerDeferredResultInterceptors(); // deferredResult拦截器
    //        configurer.setTaskExecutor(); // 应该是自定义线程池
    //        configurer.setDefaultTimeout() // 超时设置
        }
阅读全文
  • 点赞