CompletableFuture详解 CompletableFuture是JDK 1.8引入的实现类,该类实现了Future
和CompletionStage
两个接口。该类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。
CompletableFuture的UML类关系
CompletionStage接口 顾名思义,Stage是阶段的意思。CompletionStage代表某个同步或者异步计算的一个阶段,或者一系列异步任务中的一个子任务(或者阶段性任务)。
每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。这三个常用的函数式接口的特点如下:
Function
Function接口的特点是:有输入、有输出 。包装了Function实例的CompletionStage子任务需要一个输入参数,并会产生一个输出结果到下一步。
Runnable
Runnable接口的特点是:无输入、无输出 。包装了Runnable实例的CompletionStage子任务既不需要任何输入参数,又不会产生任何输出。
Consumer
Consumer接口的特点是:有输入、无输出 。包装了Consumer实例的CompletionStage子任务需要一个输入参数,但不会产生任何输出。
多个CompletionStage
构成了一条任务流水线,一个环节执行完成了可以将结果移交给下一个环节(子任务)。多个CompletionStage
子任务之间可以使用链式调用,下面是一个简单的例子:
1 2 3 oneStage.thenApply(x->square(x)) .thenAccept(y->System.out.println(y)) .thenRun(()->System.out.println())
对以上例子中的CompletionStage子任务说明如下:
oneStage
是一个CompletionStage
子任务,这是一个前提。
x -> square(x)
是一个Function类型的Lambda表达式,被thenApply()
方法包装成了一个CompletionStage子任务,该子任务需要接收一个参数x,然后输出一个结果——x的平方值。
“y -> System.out.println(y)
是一个Consumer类型的Lambda表达式,被thenAccept()
方法包装成了一个CompletionStage子任务,该子任务需要消耗上一个子任务的输出值,但是此子任务并没有输出。
() -> System.out.println()
是一个Runnable类型的Lambda表达式,被thenRun()
方法包装成了一个CompletionStage子任务,既不消耗上一个子任务的输出,又不产生结果。
CompletionStage
代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另一个阶段。虽然一个子任务可以触发其他子任务,但是并不能保证后续子任务的执行顺序。
runAsync和supplyAsync CompletionStage子任务的创建是通过CompletableFuture完成的。CompletableFuture类提供了非常强大的Future的扩展功能来帮助我们简化异步编程的复杂性,提供了函数式编程的能力来帮助我们通过回调的方式处理计算结果,也提供了转换和组合CompletionStage()
的方法。
CompletableFuture
定义了一组方法用于创建CompletionStage
子任务(或者阶段性任务),基础的方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } public static CompletableFuture<Void> runAsync (Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
在使用CompletableFuture创建CompletionStage子任务时,如果没有指定Executor线程池,在默认情况下CompletionStage会使用公共的ForkJoinPool
线程池 。
下面是两个创建CompletionStage子任务简单示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void runAsyncDemo () throws Exception { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { sleepSeconds(1 ); System.out.println("run end ..." ); }); future.get(2 , TimeUnit.SECONDS); } public static void supplyAsyncDemo () throws Exception { CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> { long start = System.currentTimeMillis(); sleepSeconds(1 ); System.out.println("supply end ..." ); return System.currentTimeMillis() - start; }); long time = future.get(2 , TimeUnit.SECONDS); System.out.println("异步执行耗时(秒) = " + time/1000 ); }
设置子任务回调钩子 可以为CompletionStage子任务设置特定的回调钩子,当计算结果完成或者抛出异常的时候,执行这些特定的回调钩子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public CompletableFuture<T> whenComplete ( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null , action); } public CompletableFuture<T> whenCompleteAsync ( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(asyncPool, action); } public CompletableFuture<T> whenCompleteAsync ( BiConsumer<? super T, ? super Throwable> action, Executor executor) { return uniWhenCompleteStage(screenExecutor(executor), action); } public CompletableFuture<T> exceptionally ( Function<Throwable, ? extends T> fn) { return uniExceptionallyStage(fn); }
下面是一个CompletionStage子任务设置完成钩子和异常钩子的简单示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class CompletableFutureDemo1 { public static void whenCompleteDemo () throws Exception { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println(Thread.currentThread().getName()+":抛出异常" ); throw new RuntimeException (Thread.currentThread().getName()+":发生异常" ); }); future.whenComplete(new BiConsumer <Void, Throwable>() { @Override public void accept (Void unused, Throwable throwable) { System.out.println(Thread.currentThread().getName()+":执行完成!" ); } }); future.exceptionally(new Function <Throwable, Void>() { @Override public Void apply (Throwable throwable) { System.out.println(Thread.currentThread().getName()+":执行失败!" + throwable.getMessage()); return null ; } }); future.get(); } public static void main (String[] args) throws Exception { whenCompleteDemo(); } }
异常回调钩子 调用cancel()
方法取消CompletableFuture
时,任务被视为异常完成,completeExceptionally()方
法所设置的异常回调钩子也会被执行。
如果没有设置异常回调钩子,发生内部异常时会有两种情况发生:
在调用get()和get(long,TimeUnit)方法启动任务时,如果遇到内部异常,get()方法就会抛出ExecutionException(执行异常)。
在调用join()和getNow(T)启动任务时(大多数情况下都是如此),如果遇到内部异常,join()和getNow(T)方法就会抛出CompletionException。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class CompletableFutureCancelExample { public static void main (String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(5 ); return "Task completed" ; } catch (InterruptedException e) { System.out.println("Task was interrupted" ); return "Task interrupted" ; } }); try { future.get(2 , TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { System.out.println("Timeout occurred, cancelling the future" ); future.cancel(true ); } if (future.isCancelled()) { System.out.println("The future has been cancelled." ); } else { try { System.out.println(future.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } }
调用handle()方法统一处理异常和结果 除了分别通过whenComplete
、exceptionally
设置完成钩子、异常钩子之外,还可以调用handle()
方法统一处理结果和异常。
handle()方法有三个重载版本,声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public <U> CompletableFuture<U> handle ( BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(null , fn); } public <U> CompletableFuture<U> handleAsync ( BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(asyncPool, fn); } public <U> CompletableFuture<U> handleAsync ( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { return uniHandleStage(screenExecutor(executor), fn); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureExceptionHandling { public static void main (String[] args) { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5 ) { throw new RuntimeException ("Something went wrong" ); } return 42 ; }); future.whenComplete((result, exception) -> { if (exception == null ) { System.out.println("Task completed successfully with result: " + result); } else { System.out.println("Task completed with exception: " + exception.getMessage()); } }); CompletableFuture<Integer> futureWithExceptionHandler = future.exceptionally(ex -> { System.out.println("Exception occurred: " + ex.getMessage()); return -1 ; }); CompletableFuture<Integer> futureWithHandler = future.handle((result, exception) -> { if (exception!= null ) { System.out.println("Handling exception: " + exception.getMessage()); return -1 ; } else { return result; } }); try { Integer result = futureWithHandler.get(); System.out.println("Final result: " + result); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } } Task completed with exception: java.lang.RuntimeException: Something went wrong Exception occurred: java.lang.RuntimeException: Something went wrong Handling exception: java.lang.RuntimeException: Something went wrong Final result: -1
线程池的使用 默认情况下,通过静态方法runAsync()
、supplyAsync()
创建的CompletableFuture任务会使用公共的ForkJoinPool
线程池,默认的线 程数是CPU的核数。当然,它的线程数可以通过以下JVM参数设置:-Djava.util.concurrent.ForkJoinPool.common.parallelism
问题是,如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的IO操作,就会导致线程池中的所有线程都阻塞在IO操作上,造成线程饥饿,进而影响整个系统的性能。所以,强烈建议大家根据不同的业务类型创建不同的线程池,以避免互相干扰。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class CompletableFutureWithThreadPool { public static void main (String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(5 ); CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2 ); return "Task 1 completed" ; } catch (InterruptedException e) { return "Task 1 interrupted" ; } }, executorService); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3 ); return "Task 2 completed" ; } catch (InterruptedException e) { return "Task 2 interrupted" ; } }, executorService); CompletableFuture<Void> combinedFuture = future1.thenCombine(future2, (result1, result2) -> { System.out.println(result1); System.out.println(result2); return null ; }); try { combinedFuture.get(); } catch (Exception e) { e.printStackTrace(); } finally { executorService.shutdown(); } } }
异步任务的串行执行 如果两个异步任务需要串行(一个任务依赖另一个任务)执行,可以通过CompletionStage
接口的thenApply()
、thenAccept()
、thenRun()
和thenCompose()
四个方法来实现。
thenApply
:
它允许你将一个同步操作应用到 CompletableFuture
的结果上,并将结果包装在一个新的 CompletableFuture
中。
适用于对异步任务的结果进行简单的转换或计算,并且需要继续以 CompletableFuture
的形式传递结果。
thenAccept
:
当你只关心上一个任务的结果而不需要产生新的结果时使用。
通常用于执行与结果相关的副作用操作,如将结果存储到数据库、发送通知等。
thenRun
:
当你不关心上一个任务的结果,只需要在任务完成后执行一些操作时使用。
适用于执行一些独立的清理工作或日志记录。
thenCompose
:
用于将多个 CompletableFuture
串联起来,避免嵌套的 CompletableFuture
。
可以将前一个任务的结果作为输入来创建新的 CompletableFuture
,实现链式调用。
thenApply()方法 thenApply
接收上一个任务的结果,对结果进行处理并返回一个新结果,可用于对结果进行转换或进一步计算。 一个 Function<T, U>
,其中 T 是上一个 CompletableFuture 的结果类型,U 是要转换的新结果类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public <U> CompletableFuture<U> thenApply ( Function<? super T,? extends U> fn) { return uniApplyStage(null , fn); } public <U> CompletableFuture<U> thenApplyAsync ( Function<? super T,? extends U> fn) { return uniApplyStage(asyncPool, fn); } public <U> CompletableFuture<U> thenApplyAsync ( Function<? super T,? extends U> fn, Executor executor) { return uniApplyStage(screenExecutor(executor), fn); }
thenApply的三个重载版本有一个共同的参数fn,该参数表示要串行执行的第二个异步任务,它的类型为Function。fn的类型声明涉及两个泛型参数,具体如下:
泛型参数 T:上一个任务所返回结果的类型。
泛型参数 U:当前任务的返回值类型。
作为示例,调用thenApply分两步计算(10+10)*2,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class ThenApplyExample { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> initialFuture = CompletableFuture.completedFuture(10 ); CompletableFuture<Integer> stepOneFuture = initialFuture.thenApply(result -> result + 10 ); CompletableFuture<Integer> finalFuture = stepOneFuture.thenApply(result -> result * 2 ); int finalResult = finalFuture.get(); System.out.println("The final result is: " + finalResult); } }
thenRun()方法 thenRun()
方法与thenApply()
方法不同的是,不关心任务的处理结果 。只要前一个任务执行完成,就开始执行后一个串行任务。不接收上一个任务的结果,仅执行一个操作,常用于执行无参的副作用操作,例如记录日志或执行一些清理工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 public CompletableFuture<Void> thenRun (Runnable action) { return uniRunStage(null , action); } public CompletableFuture<Void> thenRunAsync (Runnable action) { return uniRunStage(asyncPool, action); } public CompletableFuture<Void> thenRunAsync (Runnable action, Executor executor) { return uniRunStage(screenExecutor(executor), action); }
从方法的声明可以看出,thenRun()
方法同thenApply()方法类似,不同的是前一个任务处理完成后,thenRun()并不会把计算的结果传给后一个任务,而且后一个任务也没有结果输出。 thenRun系列方法中的action参数是Runnable
类型的,所以thenRun()
既不能接收参数又不支持返回值。
thenAccept()方法 thenAccept()
方法对thenRun()
、thenApply()
的特点进行了折中,调用此方法时后一个任务可以接收(或消费)前一个任务的处理结果,但是后一个任务没有结果输出 。
1 2 3 4 5 6 7 8 9 10 11 12 public CompletableFuture<Void> thenAccept (Consumer<? super T> action) { return uniAcceptStage(null , action); } public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action) { return uniAcceptStage(asyncPool, action); } public CompletableFuture<Void> thenAcceptAsync (Consumer<? super T> action, Executor executor) { return uniAcceptStage(screenExecutor(executor), action); }
thenAccept系列方法的回调参数为action,它的类型为Consumer<? super T>
接口,其中 T
是上一个 CompletableFuture
的结果类型。
thenCompose()方法 thenCompose()
方法在功能上与thenApply()
、thenAccept()
、thenRun(
)一样,可以对两个任务进行串行的调度操作,第一个任务操作完成时,将它的结果作为参数传递给第二个任务。
接收上一个任务的结果,将结果作为输入创建一个新的 CompletableFuture
,用于将多个 CompletableFuture
链式组合。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public <U> CompletableFuture<U> thenCompose ( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null , fn); } public <U> CompletableFuture<U> thenComposeAsync ( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(asyncPool, fn); } public <U> CompletableFuture<U> thenComposeAsync ( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) { return uniComposeStage(screenExecutor(executor), fn); }
thenCompose()
方法要求第二个任务的返回值是一个CompletionStage异步实例。因此,可以调用CompletableFuture.supplyAsync()
方法将第二个任务所要调用的普通异步方法包装成一个CompletionStage异步实例。
作为演示,调用thenCompose分两步计算(10+10)*2,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class ThenComposeExample { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> initialFuture = CompletableFuture.completedFuture(10 ); CompletableFuture<String> finalFuture = initialFuture.thenCompose(result -> { int intermediateResult = result + 10 ; return CompletableFuture.supplyAsync(() -> { int finalResult = intermediateResult * 2 ; return "The final result is: " + finalResult; }); }); String result = finalFuture.get(); System.out.println(result); } }
异步任务的合并执行 如果某个任务同时依赖另外两个异步任务的执行结果,就需要对另外两个异步任务进行合并。以泡茶喝为例,“泡茶喝”任务需要对“烧水”任务与“清洗”任务进行合并。对两个异步任务的合并可以通过CompletionStage接口的thenCombine()
、runAfterBoth()
、thenAcceptBoth()
三个方法来实现。这三个方法的不同之处主要在于其核心参数fn、action、consumer的类型不同,分别为Function<T,R>
、Runnable
、Consumer<? super T>
类型。
thenCombine()
:
可以将两个独立的 CompletableFuture
的结果进行合并,产生一个新的结果,该结果存储在新的 CompletableFuture
中。
适用于需要对两个结果进行计算或组合的场景。
runAfterBoth()
:
主要用于在两个任务完成后执行一个操作,不关心任务的结果,仅关心任务是否完成。
适合于执行一些与结果无关的任务,如发送通知或更新状态。
thenAcceptBoth()
:
用于对两个 CompletableFuture
的结果进行处理,不产生新的结果,只执行副作用操作。
可以用于执行一些依赖于两个任务结果的操作,如更新 UI 或存储结果。
thenCombine()方法 thenCombine()
会在两个CompletionStage任务都执行完成后,把两个任务的结果一起交给thenCombine()来处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public <U,V> CompletableFuture<V> thenCombine ( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(null , other, fn); } public <U,V> CompletableFuture<V> thenCombineAsync ( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(asyncPool, other, fn); } public <U,V> CompletableFuture<V> thenCombineAsync ( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) { return biApplyStage(screenExecutor(executor), other, fn); }
thenCombine()
方法的调用者为第一步的CompletionStage实例,该方法的第一个参数为第二步的CompletionStage实例,该方法的返回值为第三步的CompletionStage实例。在逻辑上,thenCombine()
方法的功能是将第一步、第二步的结果合并到第三步上。
thenCombine系列方法有两个核心参数:
other
参数:表示待合并的第二步任务的CompletionStage实例。
fn
参数:表示第一个任务和第二个任务执行完成后,第三步需要执行的逻辑。fn参数的类型为BiFunction<? super T,? super U,? extends V>
,该类型的声明涉及三个泛型参数,具体如下:
泛型参数 T:表示第一个任务所返回结果的类型。
泛型参数 U:表示第二个任务所返回结果的类型。
泛型参数 V:表示第三个任务所返回结果的类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureCombinations { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "Hello" ); CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "World" ); CompletableFuture<String> combinedFuture = futureA.thenCombine(futureB, (a, b) -> a + " " + b); System.out.println(combinedFuture.get()); } }
runAfterBoth()方法 runAfterBoth()
方法跟thenCombine()
方法不一样的是,runAfterBoth()
方法不关心每一步任务的输入参数和处理结果 。runAfterBoth()
方法有三个重载版本,声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public CompletableFuture<Void> runAfterBoth (CompletionStage<?> other, Runnable action) { return biRunStage(null , other, action); } public CompletableFuture<Void> runAfterBothAsync (CompletionStage<?> other, Runnable action) { return biRunStage(asyncPool, other, action); } public CompletableFuture<Void> runAfterBothAsync (CompletionStage<?> other, Runnable action, Executor executor) { return biRunStage(screenExecutor(executor), other, action); }
runAfterBoth()
方法的调用者为第一步任务的CompletionStage实例,runAfterBoth()方法的第一个参数为第二步任务的CompletionStage实例,runAfterBoth()方法的返回值为第三步的 CompletionStage实例。在逻辑上,第一步任务和第二步任务是并行执行的,thenCombine()方法的功能是将第一步、第二步的结果合并到第三步任务上 。与thenCombine系列方法不同,runAfterBoth系列方法的第二个参 数action为Runnable
类型,表示它的第一步任务、第二步任务、第三步任务既没有输入值,又没有输出值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureCombinations { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "Hello" ); CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "World" ); CompletableFuture<Void> runAfterFuture = futureA.runAfterBoth(futureB, () -> System.out.println("Both futures completed running" )); runAfterFuture.get(); } }
thenAcceptBoth()方法 thenAcceptBoth()
方法对runAfterBoth()
方法和thenCombine()
方法的特点进行了折中,调用该方法,第三个任务可以接收其合并过来的第一个任务、第二个任务的处理结果,但是第三个任务(合并任务)却不能返回结果 。
thenAcceptBoth()方法有三个重载版本,三个版本的声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public <U> CompletableFuture<Void> thenAcceptBoth ( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(null , other, action); } public <U> CompletableFuture<Void> thenAcceptBothAsync ( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(asyncPool, other, action); } public <U> CompletableFuture<Void> thenAcceptBothAsync ( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) { return biAcceptStage(screenExecutor(executor), other, action); }
thenAcceptBoth系列方法的第二个参数为需要合并的第二步任务的CompletionStage实例。第三个参数为第三个任务的回调函数,该参数名称为action,它的类型为BiConsumer<? super T,? super U>
接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureCombinations { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "Hello" ); CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "World" ); CompletableFuture<Void> acceptBothFuture = futureA.thenAcceptBoth(futureB, (a, b) -> System.out.println("Accepted both: " + a + " " + b)); acceptBothFuture.get(); } }
allOf()等待所有的任务结束 CompletionStage接口的allOf()
会等待所有的任务结束,以合并所有的任务。thenCombine()只能合并两个任务,**如果需要合并多个异步任务,那么可以调用allOf()
**。
1 2 3 4 5 6 7 8 9 public static void main (String[] args) { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("模拟异步任务1" )); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> System.out.println("模拟异步任务2" )); CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> System.out.println("模拟异步任务3" )); CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> System.out.println("模拟异步任务4" )); CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3, future4); all.join(); }
异步任务的选择执行 CompletableFuture对异步任务的选择执行不是按照某种条件进行选择的,而是按照执行速度进行选择的:前面两个并行任务,谁的结果返回速度快,谁的结果将作为第三步任务的输入。
对两个异步任务的选择可以通过CompletionStage接口的applyToEither()
、runAfterEither()
和acceptEither()
三个方法来实现。
applyToEither()
:
该方法用于处理两个 CompletableFuture
中先完成的那个结果。
在示例中,future1.applyToEither(future2, result -> "The first completed result is: " + result);
表示当 future1
或 future2
中的任意一个完成时,将其结果作为输入,在前面添加 "The first completed result is: "
并存储在 resultFuture
中。
可以根据具体需求修改 Function
来对先完成的结果进行不同的处理或转换。
runAfterEither()
:
当你不关心哪个 CompletableFuture
先完成,也不关心它们的结果,只需要在任意一个完成时执行一个操作时使用。
在示例中,future1.runAfterEither(future2, () -> System.out.println("One of the futures completed"));
会在 future1
或 future2
中的任意一个完成后打印一条消息。
常用于执行一些清理工作或日志记录等副作用操作,不依赖于具体的结果。
acceptEither()
:
当你需要在两个 CompletableFuture
中的任意一个完成时,使用其结果进行一些操作,但不产生新的结果时使用。
在示例中,future1.acceptEither(future2, result -> System.out.println("The first completed result is: " + result));
会在 future1
或 future2
中的任意一个完成后打印出其结果。
适用于对先完成的结果进行一些副作用操作,如更新 UI 或存储结果。
applyToEither()方法 两个CompletionStage谁返回结果的速度快,applyToEither()方法就用这个最快的CompletionStage的结果进行下一步(第三步)的回调操作。
applyToEither()方法有三个重载版本,三个版本的声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public <U> CompletableFuture<U> applyToEither ( CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(null , other, fn); } public <U> CompletableFuture<U> applyToEitherAsync ( CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(asyncPool, other, fn); } public <U> CompletableFuture<U> applyToEitherAsync ( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) { return orApplyStage(screenExecutor(executor), other, fn); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureEitherExample { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000 ); return 10 ; } catch (InterruptedException e) { return 0 ; } }); CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000 ); return 20 ; } catch (InterruptedException e) { return 0 ; } }); CompletableFuture<String> applyEitherFuture = futureA.applyToEither(futureB, result -> "The first completed value is: " + result); System.out.println(applyEitherFuture.get()); } }
runAfterEither()方法 runAfterEither()方法的功能为:前面两个CompletionStage实例,任何一个完成了都会执行第三步回调操作。三个任务的回调函数都是Runnable类型的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package pers.fulsun._13;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureEitherExample { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000 ); return 10 ; } catch (InterruptedException e) { return 0 ; } }); CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000 ); return 20 ; } catch (InterruptedException e) { return 0 ; } }); CompletableFuture<Void> runEitherFuture = futureA.runAfterEither(futureB, () -> System.out.println("One of the futures has completed" )); runEitherFuture.get(); } }
调用runAfterEither()方法,只要前面两个CompletionStage实例其中一个执行完成,就开始执行第三步的CompletionStage实例。
acceptEither()方法 acceptEither()
方法对applyToEither()
方法和runAfterEither()
方法的特点进行了折中,两个CompletionStage谁返回结果的速度快,acceptEither()
就用那个最快的CompletionStage的结果作为下一步(第三步)的输入,但是第三步没有输出 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public CompletableFuture<Void> acceptEither ( CompletionStage<? extends T> other, Consumer<? super T> action) { return orAcceptStage(null , other, action); } public CompletableFuture<Void> acceptEitherAsync ( CompletionStage<? extends T> other, Consumer<? super T> action) { return orAcceptStage(asyncPool, other, action); } public CompletableFuture<Void> acceptEitherAsync ( CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) { return orAcceptStage(screenExecutor(executor), other, action); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package pers.fulsun._13;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureEitherExample { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000 ); return 10 ; } catch (InterruptedException e) { return 0 ; } }); CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000 ); return 20 ; } catch (InterruptedException e) { return 0 ; } }); CompletableFuture<Void> acceptEitherFuture = futureA.acceptEither(futureB, result -> System.out.println("The first completed value is: " + result)); acceptEitherFuture.get(); } }
CompletableFuture的综合案例 泡茶喝实例 使用CompletableFuture实现泡茶喝实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class DrinkTea { public static void main (String[] args) { CompletableFuture<Boolean> hotJob = CompletableFuture.supplyAsync(() -> { System.out.println("洗水壶..." ); System.out.println("烧开水..." ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("水烧开了..." ); return true ; }); CompletableFuture<Boolean> washJob = CompletableFuture.supplyAsync(() -> { System.out.println("洗茶壶..." ); System.out.println("洗茶杯..." ); System.out.println("拿茶叶..." ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("洗完了..." ); return true ; }); CompletableFuture<String> drinkJob = hotJob.thenCombine(washJob, (hotOk, washOK) -> { System.out.println("泡茶..." ); if (hotOk && washOK) { System.out.println("泡茶喝,茶喝完" ); return "茶喝完了" ; } return "没有喝到茶" ; }); drinkJob.join(); } }
多个RPC调用 使用CompletableFuture进行多个RPC调用,参考代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.List;import java.util.ArrayList;public class CompletableFutureRpcExample { public static CompletableFuture<String> rpcCall (String request) { return CompletableFuture.supplyAsync(() -> { try { Thread.sleep((long ) (Math.random() * 1000 )); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "Response from " + request; }); } public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10 ); List<String> requests = new ArrayList <>(); requests.add("RPC1" ); requests.add("RPC2" ); requests.add("RPC3" ); requests.add("RPC4" ); requests.add("RPC5" ); List<CompletableFuture<String>> futures = new ArrayList <>(); for (String request : requests) { CompletableFuture<String> future = rpcCall(request); futures.add(future); } CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture [0 ])); allFutures.get(); List<String> results = new ArrayList <>(); for (CompletableFuture<String> future : futures) { results.add(future.join()); } for (String result : results) { System.out.println(result); } executorService.shutdown(); } }