CompletableFuture详解

CompletableFuture是JDK 1.8引入的实现类,该类实现了FutureCompletionStage两个接口。该类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。

CompletableFuture的UML类关系

CompletionStage接口

顾名思义,Stage是阶段的意思。CompletionStage代表某个同步或者异步计算的一个阶段,或者一系列异步任务中的一个子任务(或者阶段性任务)。

每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。这三个常用的函数式接口的特点如下:

  • Function
    Function接口的特点是:有输入、有输出。包装了Function实例的CompletionStage子任务需要一个输入参数,并会产生一个输出结果到下一步。
  • Runnable
    Runnable接口的特点是:无输入、无输出。包装了Runnable实例的CompletionStage子任务既不需要任何输入参数,又不会产生任何输出。
  • Consumer
    Consumer接口的特点是:有输入、无输出。包装了Consumer实例的CompletionStage子任务需要一个输入参数,但不会产生任何输出。

多个CompletionStage构成了一条任务流水线,一个环节执行完成了可以将结果移交给下一个环节(子任务)。多个CompletionStage子任务之间可以使用链式调用,下面是一个简单的例子:

1
2
3
oneStage.thenApply(x->square(x))
.thenAccept(y->System.out.println(y))
.thenRun(()->System.out.println())

对以上例子中的CompletionStage子任务说明如下:

  • oneStage是一个CompletionStage子任务,这是一个前提。
  • x -> square(x)是一个Function类型的Lambda表达式,被thenApply()方法包装成了一个CompletionStage子任务,该子任务需要接收一个参数x,然后输出一个结果——x的平方值。
  • “y -> System.out.println(y)是一个Consumer类型的Lambda表达式,被thenAccept()方法包装成了一个CompletionStage子任务,该子任务需要消耗上一个子任务的输出值,但是此子任务并没有输出。
  • () -> System.out.println()是一个Runnable类型的Lambda表达式,被thenRun()方法包装成了一个CompletionStage子任务,既不消耗上一个子任务的输出,又不产生结果。

CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另一个阶段。虽然一个子任务可以触发其他子任务,但是并不能保证后续子任务的执行顺序。

runAsync和supplyAsync

CompletionStage子任务的创建是通过CompletableFuture完成的。CompletableFuture类提供了非常强大的Future的扩展功能来帮助我们简化异步编程的复杂性,提供了函数式编程的能力来帮助我们通过回调的方式处理计算结果,也提供了转换和组合CompletionStage()的方法。

CompletableFuture定义了一组方法用于创建CompletionStage子任务(或者阶段性任务),基础的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//子任务包装一个Supplier实例,并调用ForkJoinPool.commonPool()线程来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
//子任务包装一个Supplier实例,并使用指定的executor线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
//子任务包装一个Runnable实例,并调用ForkJoinPool.commonPool()线程来执行
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
//子任务包装一个Runnable实例,并使用指定的executor线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}

在使用CompletableFuture创建CompletionStage子任务时,如果没有指定Executor线程池,在默认情况下CompletionStage会使用公共的ForkJoinPool线程池

下面是两个创建CompletionStage子任务简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void runAsyncDemo() throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
sleepSeconds(1);// 模拟执行1秒

System.out.println("run end ...");
});
// 等待异步任务执行完成,限时等待2秒
future.get(2, TimeUnit.SECONDS);
}

public static void supplyAsyncDemo() throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
long start = System.currentTimeMillis();
sleepSeconds(1);// 模拟执行1秒
System.out.println("supply end ...");
return System.currentTimeMillis() - start;
});
// 等待异步任务执行完成,限时等待2秒
long time = future.get(2, TimeUnit.SECONDS);
System.out.println("异步执行耗时(秒) = " + time/1000);
}

设置子任务回调钩子

可以为CompletionStage子任务设置特定的回调钩子,当计算结果完成或者抛出异常的时候,执行这些特定的回调钩子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//设置子任务完成时的回调钩子
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
//设置子任务完成时的回调钩子,可能不在同一线程执行
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
//设置子任务完成时的回调钩子,提交给线程池executor执行
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
//设置异常处理的回调钩子
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}

下面是一个CompletionStage子任务设置完成钩子和异常钩子的简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CompletableFutureDemo1 {
public static void whenCompleteDemo() throws Exception {
//创建异步任务
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
//模拟执行一秒
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName()+":抛出异常");
throw new RuntimeException(Thread.currentThread().getName()+":发生异常");
});
//设置异步任务执行完成后的回调钩子
future.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void unused, Throwable throwable) {
System.out.println(Thread.currentThread().getName()+":执行完成!");
}
});

//设置异步任务发生异常后的回调钩子
future.exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable throwable) {
System.out.println(Thread.currentThread().getName()+":执行失败!" + throwable.getMessage());
return null;
}
});
//获取异步任务的结果
future.get();
}

public static void main(String[] args) throws Exception {
whenCompleteDemo();
}
}

异常回调钩子

调用cancel()方法取消CompletableFuture时,任务被视为异常完成,completeExceptionally()方法所设置的异常回调钩子也会被执行。

如果没有设置异常回调钩子,发生内部异常时会有两种情况发生:

  • 在调用get()和get(long,TimeUnit)方法启动任务时,如果遇到内部异常,get()方法就会抛出ExecutionException(执行异常)。

  • 在调用join()和getNow(T)启动任务时(大多数情况下都是如此),如果遇到内部异常,join()和getNow(T)方法就会抛出CompletionException。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CompletableFutureCancelExample {
public static void main(String[] args) {
// 创建一个 CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 模拟一个长时间运行的任务
TimeUnit.SECONDS.sleep(5);
return "Task completed";
} catch (InterruptedException e) {
System.out.println("Task was interrupted");
return "Task interrupted";
}
});

// 尝试在 2 秒内获取结果
try {
future.get(2, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// 超时处理
System.out.println("Timeout occurred, cancelling the future");
// 取消 CompletableFuture
future.cancel(true);
}

// 检查 future 是否被取消
if (future.isCancelled()) {
System.out.println("The future has been cancelled.");
} else {
try {
// 如果未取消,尝试获取结果
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
}

调用handle()方法统一处理异常和结果

除了分别通过whenCompleteexceptionally设置完成钩子、异常钩子之外,还可以调用handle()方法统一处理结果和异常。

handle()方法有三个重载版本,声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 //在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
//可能不在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(asyncPool, fn);
}

//在指定线程池executor中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExceptionHandling {
public static void main(String[] args) {
// 创建一个 CompletableFuture
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 模拟一个可能抛出异常的操作
if (Math.random() < 0.5) {
throw new RuntimeException("Something went wrong");
}
return 42;
});

// 使用 whenComplete 处理结果或异常
future.whenComplete((result, exception) -> {
if (exception == null) {
System.out.println("Task completed successfully with result: " + result);
} else {
System.out.println("Task completed with exception: " + exception.getMessage());
}
});

// 使用 exceptionally 处理异常
CompletableFuture<Integer> futureWithExceptionHandler = future.exceptionally(ex -> {
System.out.println("Exception occurred: " + ex.getMessage());
// 返回一个默认值
return -1;
});

// 使用 handle 处理结果或异常
CompletableFuture<Integer> futureWithHandler = future.handle((result, exception) -> {
if (exception!= null) {
System.out.println("Handling exception: " + exception.getMessage());
// 返回一个默认值
return -1;
} else {
return result;
}
});

try {
// 获取结果,可能会阻塞等待
Integer result = futureWithHandler.get();
System.out.println("Final result: " + result);
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
}


Task completed with exception: java.lang.RuntimeException: Something went wrong
Exception occurred: java.lang.RuntimeException: Something went wrong
Handling exception: java.lang.RuntimeException: Something went wrong
Final result: -1

线程池的使用

默认情况下,通过静态方法runAsync()supplyAsync()创建的CompletableFuture任务会使用公共的ForkJoinPool线程池,默认的线
程数是CPU的核数。当然,它的线程数可以通过以下JVM参数设置:-Djava.util.concurrent.ForkJoinPool.common.parallelism

问题是,如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的IO操作,就会导致线程池中的所有线程都阻塞在IO操作上,造成线程饥饿,进而影响整个系统的性能。所以,强烈建议大家根据不同的业务类型创建不同的线程池,以避免互相干扰。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletableFutureWithThreadPool {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);

// 使用自定义线程池创建 CompletableFuture
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
return "Task 1 completed";
} catch (InterruptedException e) {
return "Task 1 interrupted";
}
}, executorService);

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
return "Task 2 completed";
} catch (InterruptedException e) {
return "Task 2 interrupted";
}
}, executorService);

// 当两个任务都完成时执行操作
CompletableFuture<Void> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
System.out.println(result1);
System.out.println(result2);
return null;
});

try {
// 等待组合任务完成
combinedFuture.get();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
executorService.shutdown();
}
}
}

异步任务的串行执行

如果两个异步任务需要串行(一个任务依赖另一个任务)执行,可以通过CompletionStage接口的thenApply()thenAccept()thenRun()thenCompose()四个方法来实现。

  • thenApply
    • 它允许你将一个同步操作应用到 CompletableFuture 的结果上,并将结果包装在一个新的 CompletableFuture 中。
    • 适用于对异步任务的结果进行简单的转换或计算,并且需要继续以 CompletableFuture 的形式传递结果。
  • thenAccept
    • 当你只关心上一个任务的结果而不需要产生新的结果时使用。
    • 通常用于执行与结果相关的副作用操作,如将结果存储到数据库、发送通知等。
  • thenRun
    • 当你不关心上一个任务的结果,只需要在任务完成后执行一些操作时使用。
    • 适用于执行一些独立的清理工作或日志记录。
  • thenCompose
    • 用于将多个 CompletableFuture 串联起来,避免嵌套的 CompletableFuture
    • 可以将前一个任务的结果作为输入来创建新的 CompletableFuture,实现链式调用。

thenApply()方法

thenApply接收上一个任务的结果,对结果进行处理并返回一个新结果,可用于对结果进行转换或进一步计算。一个 Function<T, U>,其中 T 是上一个 CompletableFuture 的结果类型,U 是要转换的新结果类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//后一个任务与前一个任务在同一个线程中执行    
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
//后一个任务与前一个任务不在同一个线程中执行
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
//后一个任务在指定的executor线程池中执行
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}

thenApply的三个重载版本有一个共同的参数fn,该参数表示要串行执行的第二个异步任务,它的类型为Function。fn的类型声明涉及两个泛型参数,具体如下:

  • 泛型参数 T:上一个任务所返回结果的类型。
  • 泛型参数 U:当前任务的返回值类型。

作为示例,调用thenApply分两步计算(10+10)*2,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个 CompletableFuture,初始值为 10
CompletableFuture<Integer> initialFuture = CompletableFuture.completedFuture(10);

// 第一步:将初始值 10 加 10
CompletableFuture<Integer> stepOneFuture = initialFuture.thenApply(result -> result + 10);

// 第二步:将第一步的结果乘以 2
CompletableFuture<Integer> finalFuture = stepOneFuture.thenApply(result -> result * 2);

// 获取最终结果
int finalResult = finalFuture.get();
System.out.println("The final result is: " + finalResult);
}
}

thenRun()方法

thenRun()方法与thenApply()方法不同的是,不关心任务的处理结果。只要前一个任务执行完成,就开始执行后一个串行任务。不接收上一个任务的结果,仅执行一个操作,常用于执行无参的副作用操作,例如记录日志或执行一些清理工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
//后一个任务与前一个任务在同一个线程中执行
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
//后一个任务与前一个任务不在同一个线程中执行
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
//后一个任务在executor线程池中执行
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}

从方法的声明可以看出,thenRun()方法同thenApply()方法类似,不同的是前一个任务处理完成后,thenRun()并不会把计算的结果传给后一个任务,而且后一个任务也没有结果输出。
thenRun系列方法中的action参数是Runnable类型的,所以thenRun()既不能接收参数又不支持返回值。

thenAccept()方法

thenAccept()方法对thenRun()thenApply()的特点进行了折中,调用此方法时后一个任务可以接收(或消费)前一个任务的处理结果,但是后一个任务没有结果输出

1
2
3
4
5
6
7
8
9
10
11
12
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}

thenAccept系列方法的回调参数为action,它的类型为Consumer<? super T>接口,其中 T 是上一个 CompletableFuture 的结果类型。

thenCompose()方法

thenCompose()方法在功能上与thenApply()thenAccept()thenRun()一样,可以对两个任务进行串行的调度操作,第一个任务操作完成时,将它的结果作为参数传递给第二个任务。

接收上一个任务的结果,将结果作为输入创建一个新的 CompletableFuture,用于将多个 CompletableFuture 链式组合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}

public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(asyncPool, fn);
}

public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return uniComposeStage(screenExecutor(executor), fn);
}

thenCompose()方法要求第二个任务的返回值是一个CompletionStage异步实例。因此,可以调用CompletableFuture.supplyAsync()方法将第二个任务所要调用的普通异步方法包装成一个CompletionStage异步实例。

作为演示,调用thenCompose分两步计算(10+10)*2,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenComposeExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个初始的 CompletableFuture,其结果为 10
CompletableFuture<Integer> initialFuture = CompletableFuture.completedFuture(10);

// 使用 thenCompose 进行链式操作
CompletableFuture<String> finalFuture = initialFuture.thenCompose(result -> {
// 将初始结果加 10
int intermediateResult = result + 10;
// 创建一个新的 CompletableFuture,其结果是将中间结果乘以 2 并转换为字符串
return CompletableFuture.supplyAsync(() -> {
int finalResult = intermediateResult * 2;
return "The final result is: " + finalResult;
});
});

// 获取最终结果
String result = finalFuture.get();
System.out.println(result);
}
}

异步任务的合并执行

如果某个任务同时依赖另外两个异步任务的执行结果,就需要对另外两个异步任务进行合并。以泡茶喝为例,“泡茶喝”任务需要对“烧水”任务与“清洗”任务进行合并。对两个异步任务的合并可以通过CompletionStage接口的thenCombine()runAfterBoth()thenAcceptBoth()三个方法来实现。这三个方法的不同之处主要在于其核心参数fn、action、consumer的类型不同,分别为Function<T,R>RunnableConsumer<? super T>类型。

  • thenCombine()
    • 可以将两个独立的 CompletableFuture 的结果进行合并,产生一个新的结果,该结果存储在新的 CompletableFuture 中。
    • 适用于需要对两个结果进行计算或组合的场景。
  • runAfterBoth()
    • 主要用于在两个任务完成后执行一个操作,不关心任务的结果,仅关心任务是否完成。
    • 适合于执行一些与结果无关的任务,如发送通知或更新状态。
  • thenAcceptBoth()
    • 用于对两个 CompletableFuture 的结果进行处理,不产生新的结果,只执行副作用操作。
    • 可以用于执行一些依赖于两个任务结果的操作,如更新 UI 或存储结果。

thenCombine()方法

thenCombine()会在两个CompletionStage任务都执行完成后,把两个任务的结果一起交给thenCombine()来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//合并代表第二步任务(参数other)的CompletionStage实例,返回第三步任务的CompletionStage
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}

//不一定在同一个线程中执行第三步任务的CompletionStage实例
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
//第三步任务的CompletionStage实例在指定的executor线程池中执行
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}

thenCombine()方法的调用者为第一步的CompletionStage实例,该方法的第一个参数为第二步的CompletionStage实例,该方法的返回值为第三步的CompletionStage实例。在逻辑上,thenCombine()方法的功能是将第一步、第二步的结果合并到第三步上。

thenCombine系列方法有两个核心参数:

  • other参数:表示待合并的第二步任务的CompletionStage实例。
  • fn参数:表示第一个任务和第二个任务执行完成后,第三步需要执行的逻辑。fn参数的类型为BiFunction<? super T,? super U,? extends V>,该类型的声明涉及三个泛型参数,具体如下:
    • 泛型参数 T:表示第一个任务所返回结果的类型。
    • 泛型参数 U:表示第二个任务所返回结果的类型。
    • 泛型参数 V:表示第三个任务所返回结果的类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureCombinations {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建两个 CompletableFuture
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "World");

// thenCombine 示例
CompletableFuture<String> combinedFuture = futureA.thenCombine(futureB, (a, b) -> a + " " + b);
System.out.println(combinedFuture.get());
}
}

runAfterBoth()方法

runAfterBoth()方法跟thenCombine()方法不一样的是,runAfterBoth()方法不关心每一步任务的输入参数和处理结果runAfterBoth()方法有三个重载版本,声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//合并第二步任务的CompletionStage实例,返回第三步任务的CompletionStage
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {
return biRunStage(null, other, action);
}

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action) {
return biRunStage(asyncPool, other, action);
}

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}

runAfterBoth()方法的调用者为第一步任务的CompletionStage实例,runAfterBoth()方法的第一个参数为第二步任务的CompletionStage实例,runAfterBoth()方法的返回值为第三步的
CompletionStage实例。
在逻辑上,第一步任务和第二步任务是并行执行的,thenCombine()方法的功能是将第一步、第二步的结果合并到第三步任务上。与thenCombine系列方法不同,runAfterBoth系列方法的第二个参
数action为Runnable类型,表示它的第一步任务、第二步任务、第三步任务既没有输入值,又没有输出值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureCombinations {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建两个 CompletableFuture
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "World");

// runAfterBoth 示例
CompletableFuture<Void> runAfterFuture = futureA.runAfterBoth(futureB, () -> System.out.println("Both futures completed running"));
runAfterFuture.get();
}
}

thenAcceptBoth()方法

thenAcceptBoth()方法对runAfterBoth()方法和thenCombine()方法的特点进行了折中,调用该方法,第三个任务可以接收其合并过来的第一个任务、第二个任务的处理结果,但是第三个任务(合并任务)却不能返回结果

thenAcceptBoth()方法有三个重载版本,三个版本的声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(asyncPool, other, action);
}

public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}

thenAcceptBoth系列方法的第二个参数为需要合并的第二步任务的CompletionStage实例。第三个参数为第三个任务的回调函数,该参数名称为action,它的类型为BiConsumer<? super T,? super U>接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureCombinations {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建两个 CompletableFuture
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "World");

// thenAcceptBoth 示例
CompletableFuture<Void> acceptBothFuture = futureA.thenAcceptBoth(futureB, (a, b) -> System.out.println("Accepted both: " + a + " " + b));
acceptBothFuture.get();
}
}

allOf()等待所有的任务结束

CompletionStage接口的allOf()会等待所有的任务结束,以合并所有的任务。thenCombine()只能合并两个任务,**如果需要合并多个异步任务,那么可以调用allOf()**。

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("模拟异步任务1"));
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> System.out.println("模拟异步任务2"));
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> System.out.println("模拟异步任务3"));
CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> System.out.println("模拟异步任务4"));

CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3, future4);
all.join();
}

异步任务的选择执行

CompletableFuture对异步任务的选择执行不是按照某种条件进行选择的,而是按照执行速度进行选择的:前面两个并行任务,谁的结果返回速度快,谁的结果将作为第三步任务的输入。

对两个异步任务的选择可以通过CompletionStage接口的applyToEither()runAfterEither()acceptEither()三个方法来实现。

  1. applyToEither()
    • 该方法用于处理两个 CompletableFuture 中先完成的那个结果。
    • 在示例中,future1.applyToEither(future2, result -> "The first completed result is: " + result); 表示当 future1future2 中的任意一个完成时,将其结果作为输入,在前面添加 "The first completed result is: " 并存储在 resultFuture 中。
    • 可以根据具体需求修改 Function 来对先完成的结果进行不同的处理或转换。
  2. runAfterEither()
    • 当你不关心哪个 CompletableFuture 先完成,也不关心它们的结果,只需要在任意一个完成时执行一个操作时使用。
    • 在示例中,future1.runAfterEither(future2, () -> System.out.println("One of the futures completed")); 会在 future1future2 中的任意一个完成后打印一条消息。
    • 常用于执行一些清理工作或日志记录等副作用操作,不依赖于具体的结果。
  3. acceptEither()
    • 当你需要在两个 CompletableFuture 中的任意一个完成时,使用其结果进行一些操作,但不产生新的结果时使用。
    • 在示例中,future1.acceptEither(future2, result -> System.out.println("The first completed result is: " + result)); 会在 future1future2 中的任意一个完成后打印出其结果。
    • 适用于对先完成的结果进行一些副作用操作,如更新 UI 或存储结果。

applyToEither()方法

两个CompletionStage谁返回结果的速度快,applyToEither()方法就用这个最快的CompletionStage的结果进行下一步(第三步)的回调操作。

applyToEither()方法有三个重载版本,三个版本的声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(null, other, fn);
}

public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(asyncPool, other, fn);
}

public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureEitherExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建两个 CompletableFuture
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> {
try {
// 模拟不同的完成时间
Thread.sleep(2000);
return 10;
} catch (InterruptedException e) {
return 0;
}
});
CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> {
try {
// 模拟不同的完成时间
Thread.sleep(3000);
return 20;
} catch (InterruptedException e) {
return 0;
}
});

// applyToEither 示例
CompletableFuture<String> applyEitherFuture = futureA.applyToEither(futureB, result -> "The first completed value is: " + result);
System.out.println(applyEitherFuture.get());
}
}

// The first completed value is: 10

runAfterEither()方法

runAfterEither()方法的功能为:前面两个CompletionStage实例,任何一个完成了都会执行第三步回调操作。三个任务的回调函数都是Runnable类型的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package pers.fulsun._13;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureEitherExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建两个 CompletableFuture
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> {
try {
// 模拟不同的完成时间
Thread.sleep(2000);
return 10;
} catch (InterruptedException e) {
return 0;
}
});
CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> {
try {
// 模拟不同的完成时间
Thread.sleep(3000);
return 20;
} catch (InterruptedException e) {
return 0;
}
});

// runAfterEither 示例 只会输出一次
CompletableFuture<Void> runEitherFuture = futureA.runAfterEither(futureB, () -> System.out.println("One of the futures has completed"));
runEitherFuture.get();
}
}

调用runAfterEither()方法,只要前面两个CompletionStage实例其中一个执行完成,就开始执行第三步的CompletionStage实例。

acceptEither()方法

acceptEither()方法对applyToEither()方法和runAfterEither()方法的特点进行了折中,两个CompletionStage谁返回结果的速度快,acceptEither()就用那个最快的CompletionStage的结果作为下一步(第三步)的输入,但是第三步没有输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(null, other, action);
}

public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(asyncPool, other, action);
}

public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor) {
return orAcceptStage(screenExecutor(executor), other, action);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package pers.fulsun._13;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureEitherExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建两个 CompletableFuture
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> {
try {
// 模拟不同的完成时间
Thread.sleep(2000);
return 10;
} catch (InterruptedException e) {
return 0;
}
});
CompletableFuture<Integer> futureB = CompletableFuture.supplyAsync(() -> {
try {
// 模拟不同的完成时间
Thread.sleep(3000);
return 20;
} catch (InterruptedException e) {
return 0;
}
});

// acceptEither 示例
CompletableFuture<Void> acceptEitherFuture = futureA.acceptEither(futureB, result -> System.out.println("The first completed value is: " + result));
acceptEitherFuture.get();
}
}
// The first completed value is: 10

CompletableFuture的综合案例

泡茶喝实例

使用CompletableFuture实现泡茶喝实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class DrinkTea {
public static void main(String[] args) {
// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Boolean> hotJob = CompletableFuture.supplyAsync(() -> {
System.out.println("洗水壶...");
System.out.println("烧开水...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("水烧开了...");
return true;
});

// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<Boolean> washJob = CompletableFuture.supplyAsync(() -> {
System.out.println("洗茶壶...");
System.out.println("洗茶杯...");
System.out.println("拿茶叶...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("洗完了...");
return true;
});

// 任务 3:任务 1 和任务 2 完成后执行泡茶
CompletableFuture<String> drinkJob = hotJob.thenCombine(washJob, (hotOk, washOK) -> {
System.out.println("泡茶...");
if (hotOk && washOK) {
System.out.println("泡茶喝,茶喝完");
return "茶喝完了";
}
return "没有喝到茶";
});

// 等待任务 3 执行结果
drinkJob.join();
}
}

多个RPC调用

使用CompletableFuture进行多个RPC调用,参考代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.List;
import java.util.ArrayList;

public class CompletableFutureRpcExample {

// 模拟的 RPC 调用方法,返回一个 CompletableFuture
public static CompletableFuture<String> rpcCall(String request) {
return CompletableFuture.supplyAsync(() -> {
// 模拟 RPC 调用的延迟
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Response from " + request;
});
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个线程池,可根据实际情况调整大小
ExecutorService executorService = Executors.newFixedThreadPool(10);

// 存储多个 RPC 请求
List<String> requests = new ArrayList<>();
requests.add("RPC1");
requests.add("RPC2");
requests.add("RPC3");
requests.add("RPC4");
requests.add("RPC5");

// 存储多个 CompletableFuture
List<CompletableFuture<String>> futures = new ArrayList<>();

// 发起多个 RPC 调用
for (String request : requests) {
CompletableFuture<String> future = rpcCall(request);
futures.add(future);
}

// 使用 allOf 组合多个 CompletableFuture
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

// 等待所有 RPC 调用完成
allFutures.get();

// 收集所有结果
List<String> results = new ArrayList<>();
for (CompletableFuture<String> future : futures) {
// 使用 join 方法获取结果,避免异常处理
results.add(future.join());
}

// 打印结果
for (String result : results) {
System.out.println(result);
}

// 关闭线程池
executorService.shutdown();
}
}