当前位置:首页 > 技术分析 > 正文内容

Java多线程?用CompletableFuture就够了

ruisui883个月前 (02-05)技术分析19

前言

在项目开发中,经常会遇到一个问题:在一个后端接口里,往往会进行多项耗时任务(相互之间独立,没有依赖)的操作,如:

  • 需要从不同的外部接口获取不同的数据,做融合;
  • 请求外部接口数据的同时,还需要读取数据库;
  • 等等

如果在一个请求的主线程里,串行做这些任务操作,会导致响应时间的线性叠加,极有可能导致不符合要求,如图1:

图1

那么,对这些耗时任务进行并行操作,从而使得:响应时间 约等于 耗时最大的任务处理时间,这样可以大大降低系统的响应时间,如图2:

图2

Future

Future类型,其实就是一个未来任务的返回对象,或者说是子线程的返回对象(通过线程池方式分配子线程)

ExecutorService executor = Executors.newFixedThreadPool(4); 
// 定义任务:
Callable task = new Task();
// 提交任务并获得Future:
Future future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞
复制代码

可以看到,通过线程池的方式创建子线程后,executor.submit()返回的是一个Future对象,通过future.get()方法来获得该子任务的运行结果。需要注意的是,这个操作是阻塞的,也就是说,如果这个子任务没有运行结束,主线程会一直block在改行,直到子任务完成。

一个Future接口表示一个未来可能会返回的结果,它定义的方法有:

  • get():获取结果(可能会等待)
  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
  • cancel(boolean mayInterruptIfRunning):取消当前任务;
  • isDone():判断任务是否已完成。

CompletableFuture

当需要判断图2中的所有task是否完成时,如果采用Future,则需要:

  • 调用future.get()获取运行结果,
  • 或者轮询future.isDone()方法直到返回true

无论哪种方法,都是在主线程里调用,且会阻塞主线程。

以上痛点,从Java 8开始引入了CompletableFuture方法。主要新增的功能有:

  • thenAccept(): 当task正常完成后,回调调用.thenAccept()方法
  • exceptionally(): 当task出现异常时,回调调用.exceptionally()方法
  • anyOf(): 当所有的task中,只要有一个task完成,则主线程继续往下走,可以使用.anyOf()方法
  • allOf(): 所有的task均完成后,则主线程继续往下走
  • supplyAsync(): 异步执行,有返回值
  • runAsync(): 异步执行,无返回值

针对图2,需要所有task都完成后,再执行后续操作,就可以用allOf()方法:

CompletableFuture.allOf(task1, task2, ..., taskn).join();

注意:CompletableFuture的命名规则:

  • xxx():表示该方法将继续在已有的线程中执行;
  • xxxAsync():表示将异步在线程池中执行,即可以异步执行。

基于CompletableFuture+线程池的实现

线程池配置类

@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {
    @Bean
    public Executor asyncExecutor() {
        log.info("start async executor");
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//        配置核心线程数
        threadPoolTaskExecutor.setCorePoolSize(ThreadPoolConstant.CORE_POOL_SIZE);
//        配置最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(ThreadPoolConstant.MAX_POOL_SIZE);
//        配置队列大小
        threadPoolTaskExecutor.setQueueCapacity(ThreadPoolConstant.QUEUE_CAPACITY);
//        配置线程池中线程的名称前缀
        threadPoolTaskExecutor.setThreadNamePrefix(ThreadPoolConstant.THREAD_NAME_PREFIX);
//   HelloWorldServiceImpl     rejection-policy: 当pool已经达到max size时,如何处理新任务:
//        CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行;
//        AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常;
//        DiscardPolicy:丢弃当前将要加入队列的任务;
//        DiscardOldestPolicy:丢弃任务队列中最旧的任务;
        threadPoolTaskExecutor.setRejectedExecutionHandler(
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

异步服务与服务实现

public interface AsyncService {
    @Async("asyncExecutor")
    CompletableFuture getResponseFromCp(QueryTrainInfoDetailReqDTOWithType queryTrainInfoDetailReqDTOWithType, int queryType);
}
@Service
public class AsyncServiceImpl implements AsyncService {
    @Autowired
    CustomProps customProps;
    @Autowired
    RestTemplate restTemplate;
    @Override
    public CompletableFuture getResponseFromCp(QueryTrainInfoDetailReqDTOWithType queryTrainInfoDetailReqDTOWithType, int queryType) {
        return CompletableFuture
                .completedFuture(
                        FactoryUtil
                                .createFactory(customProps, null, restTemplate)
                                .obtainData(queryTrainInfoDetailReqDTOWithType.setQueryType(queryType), String.class)
                );
    }
}

业务代码中调用异步服务接口

...
    @Autowired
    AsyncService asyncService;

    @Override
    public ReturnData qTrainInfoDetail(QueryTrainInfoDetailReqDTO queryTrainInfoDetailReqDTO) {
        QueryTrainInfoDetailReqDTOWithType queryTrainInfoDetailReqDTOWithType = new QueryTrainInfoDetailReqDTOWithType().setQueryTrainInfoDetailReqDTO(queryTrainInfoDetailReqDTO);
        CompletableFuture fromCpFirstReq = asyncService.getResponseFromCp(queryTrainInfoDetailReqDTOWithType, 1);
        CompletableFuture fromCpSecondReq = asyncService.getResponseFromCp(queryTrainInfoDetailReqDTOWithType, 2);
        CompletableFuture.allOf(fromCpFirstReq, fromCpSecondReq).join(); //阻塞直到当第一次请求和第二次请求都完成
    }
...

扫描二维码推送至手机访问。

版权声明:本文由ruisui88发布,如需转载请注明出处。

本文链接:http://www.ruisui88.com/post/1702.html

标签: 技术博客
分享给朋友:

“Java多线程?用CompletableFuture就够了” 的相关文章

基于archlinux的发行版有哪些?

Arch Linux 是一个 Linux 发行版,采用滚动更新的模型,这意味着 Arch Linux 不会定期发布新版本,而是持续接收更新和升级,保持系统与最新软件版本的同步。Arch Linux 以其极简主义、简单性和用户定制为中心的特点而闻名,专注于让用户对其系统配置具有完全控制权。然而,它也以...

Gitlab概览

Gitlab是开源的基于Git的仓库管理系统,也可以管理软件开发的整个生命周期,是项目管理和代码托管平台,支撑着整个DevOps的生命周期。Gitlab很容易选为GitHub,作为公司私有库管理的工具。我们可以用Gitlab Workflow来协同整个团队的软件开发管理过程。软件开发阶段Gitlab...

Windows 下 Git 拉 Gitlab 代码

读者提问:『阿常你好,Windows 下 Git 拉 Gitlab 代码的操作步骤可以分享一下吗?』阿常回答:好的,总共分为五个步骤。一、Windows 下安装 Git官网下载链接:https://git-scm.com/download/winStandalone Installer(安装版)注意...

身体越柔软越好?刻苦拉伸可能反而不健康 | 果断练

坐下伸直膝盖,双手用力向前伸,再用力……比昨天前进了一厘米,又进步了! 这么努力地拉伸,每个人都有自己的目标,也许是身体健康、线条柔美、放松肌肉、体测满分,也可能为了随时劈个叉,享受一片惊呼。 不过,身体柔软,可以享受到灵活的福利,也可能付出不稳定的代价,并不是越刻苦拉伸越好。太硬或者太软,都不安全...

vue 开发规范

项目运行指南(#项目运行指南)开发本地环境(#开发本地环境)开发相关插件/工具(#开发相关插件工具)开发规范(#开发规范)vue(#vue)【数据流向】(#数据流向)【慎用全局注册】(#慎用全局注册)【组件名称】(#组件名称)【组件中的 CSS】(#组件中的-css)【统一标签顺序】(#统一标签顺序...

Vue中的路由配置常用属性

router:路由页面跳转的核心库;引入路由:import VueRouter from 'vue-router'; 注册路由:const router = new VueRouter({ })mode:模式路由有hash history两种模式:hash模式URL中包含#,#后边是...