10.CompletableFuture异步编程

在JDK8之前,若要进行异步编程,只能通过JDK5中提供的FutureCallable实现。 若要想知道什么时候异步任务执行完成只能去轮询future.isDone()或者future.get()阻塞等待, 而且Future存在局限性,它很难描述多个异步任务之间的依赖关系。

在JDK8中引入了CompletableFuture,它本身实现了Future接口,还结合了ExecutorServiceCompletionStage可以完成一些Future不能做到的事情。

CompletionStage接口

CompletionStage分为有返回值(CompletionStage<T>)与无返回值(CompletionStage<Void>)两种,类似Runnable和Callable。

接口方法中频繁出现了一些动词runacceptapplyhandlecombinecompose,形容词asynceitherboth,副词thenwhen,主要方法名基本由这些词构成。 通过这些方法实现了任务的串行并行或聚合与聚合

方法参数大量使用函数式接口SupplierConsumerBiConsumerFunctionBiFunction

词语 描述
run 执行一个动作
accept 对返回值执行一个动作
apply 对任务返回值执行一次转换
handle 对任务返回值或异常进行转换,使当前任务结果为转换后的值
combine 将两个任务的返回值进行结合,使当前任务返回值为结合后的值
compose 将当前任务返回值组合成一个新任务返回
async 异步执行,一般带有async方法会有一个重载方法,多一个自定义线程池参数,默认使用ForkJoin common线程池
either 多个任务,任意一个任务完成时继续执行
both 两个任务完成时继续执行
then 当前任务完成后继续执行其他任务
when 当任务完成时,对返回值或异常执行一个动作

CompletableFuture API

  1. 创建CompletableFuture的方式

    方法 描述
    constructor 使用构造方法直接new
    staticrunAsync 无返回值任务
    staticsupplyAsync 有返回值任务
    staticallOf 无返回值任务,当所有任务完成时返回
    staticanyOf 有返回值任务,当惹你任务完成时返回
    staticcompletedFuture 有返回值任务,以给定值为任务返回值
  2. 除CompletionStage和Future之外的方法

    方法 描述
    complete 若任务未完成,以给定值完成任务
    completeExceptionally 若任务未完成,以给定异常完成任务
    getNow 若任务未完成以给定值立即返回,否则返回任务值
    join 类似Future的get方法,但是若有异常抛出,建议获取阻塞值使用这个方法
    obtrudeValue 强制以给定值完成任务
    obtrudeException 强制以给定异常完成任务
    getNumberOfDependents 返回当前任务待完成任务的数量,设计用于系统监控

CompletableFuture示例

  1. 异步执行无返回值
     public void run() {
         CompletableFuture<Void> future = CompletableFuture.runAsync(() ->
                 System.out.printf("%s says, hello world \n", Thread.currentThread().getName())
             )
             .thenRunAsync(() -> System.out.printf("%s says, hello world again\n", Thread.currentThread().getName()));
    
         future.join();
     }
    
  2. 异步执行有返回值
     public void supply() {
         CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
             .thenApplyAsync(s -> {
                 try {
                     TimeUnit.SECONDS.sleep(1);
                 } catch (InterruptedException ignore) {
                 }
                 System.out.printf("%s says, %s\n", Thread.currentThread().getName(), s);
                 return s + " World";
             })
             .thenApplyAsync(s -> {
                 System.out.printf("%s says, %s\n", Thread.currentThread().getName(), s);
                 return s + "!";
             })
             .whenComplete((s, e) -> {
                 if (Objects.nonNull(e)) {
                     System.out.println(e.getMessage());
                 }
    
                 System.out.printf("%s says, %s\n", Thread.currentThread().getName(), s);
             });
    
         System.out.println(future.join());
     }
    
  3. 多个任务异步执行
     public void allOf() {
         // 异步执行所有任务
         CompletableFuture.allOf(
                 CompletableFuture.runAsync(() ->
                     System.out.printf("%s says, hello world \n", Thread.currentThread().getName())
                 ),
                 CompletableFuture.runAsync(() ->
                     System.out.printf("%s says, hello world again\n", Thread.currentThread().getName())
                 ),
                 CompletableFuture.runAsync(() -> {
                     try {
                         TimeUnit.SECONDS.sleep(2);
                     } catch (InterruptedException ignore) {
                     }
                     System.out.printf("%s says, hello world \n", Thread.currentThread().getName());
                 })
             )
             // 当所有任务完成时
             .join();
     }
    
  4. 多个任务组合异步执行
     public void combine() {
         // 异步执行带返回值的任务
         Integer join = CompletableFuture.completedFuture(1)
             .thenCombineAsync(CompletableFuture.supplyAsync(() -> {
                 try {
                     TimeUnit.SECONDS.sleep(3);
                 } catch (InterruptedException ignore) {
                 }
                 System.out.printf("calc 2 is thread %s\n", Thread.currentThread().getName());
                 return 2;
             }), Integer::sum)
             .thenCombineAsync(CompletableFuture.supplyAsync(() -> {
                 try {
                     TimeUnit.SECONDS.sleep(2);
                 } catch (InterruptedException ignore) {
                 }
                 System.out.printf("calc 3 is thread %s\n", Thread.currentThread().getName());
                 return 3;
             }), Integer::sum)
             .join();
         System.out.println(join);
     }