异步回调

这里使用阻塞模式和异步回调模式分别实现其中的异步泡茶流程。强调一下,这里直接略过顺序执行的冒泡工序,那个效率太低了。

为了异步执行整个泡茶流程,分别设计三个线程:

  • 泡茶线程(MainThread,主线程)、烧水线程(HotWaterThread)和清洗线程(WashThread)。
  • 泡茶线程的工作是:启动清洗线程、启动烧水线程,等清洗、烧水的工作完成后,泡茶喝;
  • 清洗线程的工作是:洗茶壶、洗茶杯;
  • 烧水线程的工作是:洗好水壶,灌上凉水,放在火上,一直等水烧开。

下面分别使用阻塞模式、回调模式实现泡茶的案例。

join:异步阻塞之闷葫芦

阻塞模式实现泡茶实例首先从基础的多线程join合并实验入手。join操作的原理是阻塞当前的线程,直到待合并的目标线程执行完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class JoinDemo {
public static String getCurrentThradName() {
return Thread.currentThread().getName();
}

static class HotWarterThread extends Thread {
public HotWarterThread() {
super("** 烧水-Thread");
}

@Override
public void run() {
try {
System.out.println(getCurrentThradName() + "洗好水壶...");
System.out.println(getCurrentThradName() + "灌上泉水...");
System.out.println(getCurrentThradName() + "放在火上...");
System.out.println(getCurrentThradName() + "等待烧开...");
Thread.sleep(5000);
System.out.println(getCurrentThradName() + "水烧开了...");
} catch (InterruptedException e) {
System.out.println(getCurrentThradName() + "烧水线程被中断了...");
}
System.out.println(getCurrentThradName() + "烧水线程结束了...");
}
}

static class WashThread extends Thread {
public WashThread() {
super("** 清洗-Thread");
}

@Override
public void run() {
try {
System.out.println(getCurrentThradName() + "洗茶壶...");
System.out.println(getCurrentThradName() + "洗茶杯...");
System.out.println(getCurrentThradName() + "拿茶叶...");
Thread.sleep(3000);
System.out.println(getCurrentThradName() + "清洗完了...");
} catch (InterruptedException e) {
System.out.println(getCurrentThradName() + "清洗线程被中断了...");
}
System.out.println("清洗线程结束了...");
}
}

public static void main(String[] args) {
HotWarterThread hThread = new HotWarterThread();
WashThread wThread = new WashThread();
wThread.start();
hThread.start();
// ... 在等待烧水和清洗时,可以干点其他事情
try {
// 合并烧水和清洗线程
hThread.join();
wThread.join();
Thread.currentThread().setName("主线程");
System.out.println("=======================");
System.out.println(getCurrentThradName() + "烧水和清洗都完成了...");
System.out.println(getCurrentThradName() + "泡茶喝...");
} catch (InterruptedException e) {
System.out.println("主线程被中断了...");
}
}
}

join()方法详解

join()方法的应用场景如下:

  • A线程调用B线程的join()方法,等待B线程执行完成,在B线程没有完成前,A线程阻塞。
  • Join()方法有三个重载版本:
    • void join():A线程等待B线程执行结束后,A线程重启执行。
    • void join(long millis):A线程等待B线程执行一段时间,最长等待时间为millis毫秒。超过millis毫秒后,不论B线程是否结束,A线程重启执行。
    • void join(long millis,int nanos):等待乙方线程执行一段时间,最长等待时间为millis毫秒加nanos纳秒。超过时间后,不论乙方是否结束,甲方线程都重启执行。
  • 强调一下容易混淆的几点:
    • join()是实例方法不是静态方法,需要使用线程对象去调用,如thread.join()。
    • 调用join()时,不是thread所指向的目标线程阻塞,而是当前线程阻塞。
    • 只有等到thread所指向的线程执行完成或者超时,当前线程才能启动执行。
  • join()有一个问题:
    • 被合并线程没有返回值。比如,在烧水的实例中,如果烧水线程执行结束,main线程是没有办法知道结果的。同样,清洗线程的执行结果,main线程(泡茶线程)也是没有办法知道的。形象地说,join线程合并就像一个闷葫芦,只能发起合并线程,不能取到执行结果。

join()的实现源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0); //阻塞当前线程
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay); //限时阻塞当前线程
now = System.currentTimeMillis() - base;
}
}
}

实现原理是不停地检查join线程是否存活,如果join线程存活,wait(0)就永远等下去,直至join线程终止后,线程的this.notifyAll()方法会被调用(该方法是在JVM中实现的,JDK中并不会看到源码),join()方法将退出循环,恢复业务逻辑执行。很显然这种循环检查的方式比较低效。

除此之外,调用join()缺少很多灵活性,比如实际项目中很少自己单独创建线程,而是使用Executor,这进一步减少了join()的使用
场景,所以join()的使用多数停留在Demo演示上。

FutureTask

为了获取异步线程的返回结果,Java在1.5版本之后提供了一种新的多线程创建方式——FutureTask方式。FutureTask方式包含一系列Java相关的类,处于java.util.concurrent包中。使用FutureTask方式进行异步调用时,所涉及的重要组件为FutureTask类和Callable接口。

由于Runnable有一个重要的问题,它的run()方法是没有返回值的,因此Runnable不能用在需要有返回值的场景。为了解决Runnable接口的问题,Java定义了一个新的和Runnable类似的接口——Callable接口,并且将其中被异步执行的业务处理抽象方法——run()方法改名为call()方法,但是**call()方法有返回值**。

FutureTask获取异步结果

通过FutureTask类和Callable接口的联合使用可以创建能获取异步执行结果的线程。具体的步骤重复介绍如下:

  1. 创建一个Callable接口的实现类,并实现它的call()方法,编写好异步执行的具体逻辑,并且可以有返回值。
  2. 使用Callable实现类的实例构造一个FutureTask实例。
  3. 使用FutureTask实例作为Thread构造器的target入参,构造新的Thread线程实例。
  4. 调用Thread实例的start()方法启动新线程,启动新线程的run()方法并发执行。其内部的执行过程为:启动Thread实例的run()方法并发执行后,会执行FutureTask实例的run()方法,最终会并发执行Callable实现类的call()方法。
  5. 调用FutureTask对象的get()方法阻塞性地获得并发线程的执行结果。

FutureTask实现异步泡茶喝

join版本泡茶示例中有一个很大的问题,就是主线程获取不到异步线程的返回值。打个比方,如果烧水线程出了问题,或者清
洗线程出了问题,main线程(泡茶线程)没有办法知道。哪怕不具备泡茶条件,main线程(泡茶线程)也只能继续泡茶喝。

使用FutureTask实现异步泡茶喝,main线程可以获取烧水线程、清洗线程的执行结果,然后根据结果判断是否具备泡茶条件,如果具备泡茶条件再泡茶。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package pers.fulsun._11;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class JavaFutureDemo {
public static String getCurrentThradName() {
return Thread.currentThread().getName();
}

static class HotWarterJob implements Callable<Boolean> {

@Override
public Boolean call() {
try {
System.out.println(getCurrentThradName() + "洗好水壶...");
System.out.println(getCurrentThradName() + "灌上泉水...");
System.out.println(getCurrentThradName() + "放在火上...");
System.out.println(getCurrentThradName() + "等待烧开...");
Thread.sleep(5000);
System.out.println(getCurrentThradName() + "水烧开了...");
} catch (InterruptedException e) {
System.out.println(getCurrentThradName() + "烧水线程被中断了...");
return false;

}
System.out.println(getCurrentThradName() + "烧水线程结束了...");
return true;
}
}

static class WashJob implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {
System.out.println(getCurrentThradName() + "洗茶壶...");
System.out.println(getCurrentThradName() + "洗茶杯...");
System.out.println(getCurrentThradName() + "拿茶叶...");
Thread.sleep(3000);
int i = 1 / 0;
System.out.println(getCurrentThradName() + "清洗完了...");
} catch (Exception e) {
System.out.println(getCurrentThradName() + "清洗线程被中断了...");
return false;
}
System.out.println("清洗线程结束了...");
return true;
}
}

public static void drinkTea(boolean warterOk, boolean cupOk) {
if (warterOk && cupOk) {
System.out.println("烧水和清洗都完成了...泡茶喝...");
} else if (!warterOk) {
System.out.println("烧水失败,没有茶喝了...");
} else if (!cupOk) {
System.out.println("清洗失败,没有茶喝了...");
}
}

public static void main(String[] args) {
Thread.currentThread().setName("主线程");
Callable<Boolean> hJob = new HotWarterJob();//③
FutureTask<Boolean> hTask = new FutureTask<>(hJob);//④
Thread hThread = new Thread(hTask, "** 烧水Thread");//⑤

Callable<Boolean> wJob = new WashJob();//③
FutureTask<Boolean> wTask = new FutureTask<>(wJob);//④
Thread wThread = new Thread(wTask, "$$ 清洗Thread");//⑤
hThread.start();
wThread.start();

// ... 在等待烧水和清洗时,可以干点其他事情
try {
boolean warterOk = hTask.get();
boolean cupOk = wTask.get();
drinkTea(warterOk, cupOk);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}

在上面的泡茶喝实例代码中使用了Callable接口来替代Runnable接口,并且在call方法中返回了异步线程的执行结果

1
2
3
4
5
6
static class WashJob implements Callable<Boolean>{
@Override
public Boolean call() throws Exception {
//业务代码,并且有执行结果返回
}
}

从Callable异步逻辑到异步线程需要创建一个FutureTask实例,并通过FutureTask实例创建新的线程

1
2
3
4
5
Callable<Boolean> hJob = new HotWarterJob();//异步逻辑
//包装异步逻辑的异步任务实例
FutureTask<Boolean> hTask = new FutureTask<Boolean>(hJob);
//异步线程
Thread hThread = new Thread(hTask, "** 烧水-Thread");

FutureTask和Callable都是泛型类,泛型参数表示返回结果的类型。所以,在使用时它们两个实例的泛型参数需要保持一致。最后,通过FutureTask实例取得异步线程的执行结果。一般来说,通过FutureTask实例的get方法可以获取线程的执行结果。

因为通过FutureTask的get()方法获取异步结果时,主线程也会被阻塞。这一点FutureTask和join是一致的,它们都是异步阻塞模式

异步阻塞的效率往往比较低,被阻塞的主线程不能干任何事情,唯一能干的就是傻傻等待。原生Java API除了阻塞模式的获取结果
外,并没有实现非阻塞的异步结果获取方法。如果需要用到获取的异步结果,得引入一些额外的框架,接下来将会介绍谷歌的Guava框架。

异步回调与主动调用

在前面的泡茶喝实例中,不论主线程调用join()进行闷葫芦式线程同步,还是使用Future.get()获取异步线程的执行结果,都属于主动模式的调用

在泡茶喝的例子中,泡茶线程是调用线程,烧水(或者清洗)线程是被调用线程,调用线程和被调用线程之间是一种主动关系,而不
是被动关系。泡茶线程需要主动获取烧水(或者清洗)线程的执行结果。

  • 调用join()Future.get()进行同步时,泡茶线程和烧水(或者清洗)线程之间的主动关系如图所示。

  • 主动调用是一种阻塞式调用,它是一种单向调用,“调用方”要等待“被调用方”执行完毕才返回。如果“被调用方”的执行时间很长,那么“调用方”线程需要阻塞很长一段时间。

  • 如何将主动调用的方向进行反转呢?这就是异步回调。回调是一种反向的调用模式,也就是说,被调用方在执行完成后,会反向执行“调用方”所设置的钩子方法。使用回调模式将泡茶线程和烧水(或者清洗)线程之间的“主动”关系进行反转:

    • 实质上,在回调模式中负责执行回调方法的具体线程已经不再是调用方的线程(如示例中的泡茶线程),而是变成了异步的被调用方的线程(如烧水线程)。
    • Java中回调模式的标准实现类为CompletableFuture,由于该类出现的时间比较晚,因此很多著名的中间件如Guava、Netty等都提供了自己的异步回调模式API供开发者使用。开发者还可以使用RxJava响应式编程组件进行异步回调的开发。

Guava的异步回调模式

Guava是Google提供的Java扩展包,它提供了一种异步回调的解决方案。Guava中与异步回调相关的源码处于com.google.common.util.concurrent包中。包中的很多类都用于对java.util.concurrent的能力扩展和能力增强。比如,Guava的异步任务接口ListenableFuture扩展了Java的Future接口,实现了异步回调的能力。

1
2
3
4
5
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>

详解FutureCallback

总体来说,Guava主要增强了Java而不是另起炉灶。为了实现异步回调方式获取异步线程的结果,Guava做了以下增强:

  • 引入了一个新的接口ListenableFuture,继承了Java的Future接口,使得Java的Future异步任务在Guava中能被监控和非阻塞获取异步结果。
  • 引入了一个新的接口FutureCallback,这是一个独立的新接口。该接口的目的是在异步任务执行完成后,根据异步结果完成不同的回调处理,并且可以处理异步结果。
  • FutureCallback是一个新增的接口,用来填写异步任务执行完后的监听逻辑。FutureCallback拥有两个回调方法:
    • onSuccess()方法,在异步任务执行成功后被回调。调用时,异步任务的执行结果作为onSuccess方法的参数被传入。
  • onFailure()方法,在异步任务执行过程中抛出异常时被回调。调用时,异步任务所抛出的异常作为onFailure方法的参数被传入。

FutureCallback的源码如下:

1
2
3
4
5
6
7
public interface FutureCallback<V> {

void onSuccess(@Nullable V result);

void onFailure(Throwable t);
}

Guava的FutureCallback与Java的Callable名字相近,实质不同,存在本质的区别:

  1. Java的Callable接口代表的是异步执行的逻辑
  2. Guava的FutureCallback接口代表的是Callable异步逻辑执行完成之后,根据成功或者异常两种情形所需要才可能执行Guava中的FutureCallback结果回调

详解ListenableFuture

Guava引入了一个新接口ListenableFuture,它继承了Java的Future接口,增强了被监控的能力。

1
2
3
4
5
public interface ListenableFuture<V> extends Future<V> {
//此方法由Guava内部调用
void addListener(Runnable listener, Executor executor);
}

ListenableFuture仅仅增加了一个addListener()方法。它的作用就是FutureCallback善后回调逻辑封装成一个内部的Runnable异步回调任务,在Callable异步任务完成后回调FutureCallback善后逻辑。在实际编程中,addListener()不会使用到。

在实际编程中,如何将FutureCallback回调逻辑绑定到异步的ListenableFuture任务呢?

可以使用Guava的Futures工具类,它有一个addCallback()静态方法,可以将FutureCallback的回调实例绑定到ListenableFuture异步任务。下面是一个简单的绑定实例:

1
2
3
4
5
6
7
8
9
10
11
12
// 添加一个回调,当异步任务完成时会执行这个回调
Futures.addCallback(future, new com.google.common.util.concurrent.FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("异步操作成功,结果是: " + result);
}

@Override
public void onFailure(Throwable t) {
System.out.println("异步操作失败,原因是: " + t.getMessage());
}
}, MoreExecutors.directExecutor());

ListenableFuture异步任务

如果要获取Guava的ListenableFuture异步任务实例,主要通过向线程池(ThreadPool)提交Callable任务的方式获取。不过,这里所说的线程池不是Java的线程池,而是经过Guava自己定制过的Guava线程池。

Guava线程池是对Java线程池的一种装饰。创建Guava线程池的方法如下:

  • 首先创建Java线程池,然后以其作为Guava线程池的参数再构造一个Guava线程池。有了Guava的线程池之后,就可以通过submit()方法
    来提交任务了,任务提交之后的返回结果就是我们所要的ListenableFuture异步任务实例。

    1
    2
    3
    4
    5
    //Java线程池
    ExecutorService jPool = Executors.newFixedThreadPool(10);
    // Guava线程池
    ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);

  • 取到了ListenableFuture实例后,通过Futures.addCallback()方法将FutureCallback回调逻辑的实例绑定到ListenableFuture异步任务实例,实现异步执行完成后的回调。

    1
    2
    3
    4
    5
    6
    //submit()方法用来提交任务,返回异步任务实例
    ListenableFuture<Boolean> hFuture = gPool.submit(hJob);
    //绑定回调实例
    Futures.addCallback(listenableFuture, new FutureCallback<Boolean>(){
    //有两种实现回调的方法 onSuccess()/onFailure()
    });

Guava实现泡茶喝的实例

基于Guava异步回调模式的泡茶喝程序的执行流程如图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
public class GuavaFutureDemo {
public static String getCurrentThradName() {
return Thread.currentThread().getName();
}

static class HotWarterJob implements Callable<Boolean> {

@Override
public Boolean call() {
try {
System.out.println(getCurrentThradName() + "洗好水壶...");
System.out.println(getCurrentThradName() + "灌上泉水...");
System.out.println(getCurrentThradName() + "放在火上...");
System.out.println(getCurrentThradName() + "等待烧开...");
Thread.sleep(5000);
System.out.println(getCurrentThradName() + "水烧开了...");
} catch (InterruptedException e) {
System.out.println(getCurrentThradName() + "烧水线程被中断了...");
return false;

}
System.out.println(getCurrentThradName() + "烧水线程结束了...");
return true;
}
}

static class WashJob implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {
System.out.println(getCurrentThradName() + "洗茶壶...");
System.out.println(getCurrentThradName() + "洗茶杯...");
System.out.println(getCurrentThradName() + "拿茶叶...");
Thread.sleep(3000);
System.out.println(getCurrentThradName() + "清洗完了...");
} catch (Exception e) {
System.out.println(getCurrentThradName() + "清洗线程被中断了...");
return false;
}
System.out.println(getCurrentThradName() +"清洗线程结束了...");
return true;
}
}

public static void drinkTea(boolean warterOk, boolean cupOk) {
if (warterOk && cupOk) {
System.out.println("烧水和清洗都完成了...泡茶喝...");
} else if (!warterOk) {
System.out.println("烧水失败,没有茶喝了...");
} else if (!cupOk) {
System.out.println("清洗失败,没有茶喝了...");
}
}

// 泡茶喝的工作
static class DrinkJob {

boolean waterOk = false;
boolean cupOk = false;

// 泡茶喝,回调方法
public void drinkTea() {
if (waterOk && cupOk) {
System.out.println("烧水和清洗都完成了...泡茶喝...");
this.waterOk = false;
}

}
}


public static void main(String[] args) throws InterruptedException {
Thread.currentThread().setName("泡茶喝线程");
DrinkJob drinkJob = new DrinkJob();
// 烧水的业务逻辑
Callable<Boolean> hotJob = new HotWarterJob();
// 清洗的业务逻辑
Callable<Boolean> washJob = new WashJob();

// 创建Java 线程池
ExecutorService jPool = Executors.newFixedThreadPool(10);
// 包装Java线程池,构造guava 线程池
ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
// 烧水的回调钩子
FutureCallback<Boolean> hotWaterHook = new FutureCallback<Boolean>() {

@Override
public void onSuccess(@Nullable Boolean result) {
if (result) {
drinkJob.waterOk = true;
// 执行回调方法
drinkJob.drinkTea();
}

}

@Override
public void onFailure(Throwable t) {
System.out.println(Thread.currentThread().getName() + "烧水失败,没有茶喝了");
}
};
// 启动烧水线程
ListenableFuture<Boolean> hotFuture = gPool.submit(hotJob);
// 设置烧水任务的回调钩子
Futures.addCallback(hotFuture, hotWaterHook, gPool);

// 清洗的回调钩子
FutureCallback<Boolean> washHook = new FutureCallback<Boolean>() {

@Override
public void onSuccess(@Nullable Boolean result) {
if (result) {
drinkJob.cupOk = true;
// 执行回调方法
drinkJob.drinkTea();
}
}

@Override
public void onFailure(Throwable t) {
System.out.println(Thread.currentThread().getName() + "清洗失败,没有茶喝了");
}
};
// 启动清洗线程
ListenableFuture<Boolean> washFuture = gPool.submit(washJob);
// 设置清洗任务的回调钩子
Futures.addCallback(washFuture, washHook, gPool);

System.out.println(Thread.currentThread().getName() + "干点其他事情......");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "执行完成");

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
pool-1-thread-1洗好水壶...
pool-1-thread-1灌上泉水...
pool-1-thread-1放在火上...
pool-1-thread-1等待烧开...
泡茶喝线程干点其他事情......
pool-1-thread-2洗茶壶...
pool-1-thread-2洗茶杯...
pool-1-thread-2拿茶叶...
泡茶喝线程执行完成
pool-1-thread-2清洗完了...
pool-1-thread-2清洗线程结束了...
pool-1-thread-1水烧开了...
pool-1-thread-1烧水线程结束了...
烧水和清洗都完成了...泡茶喝...

以上结果,烧水线程为pool-1-thread-1,清洗线程为pool-1-thread-2,在二者完成之前,泡茶喝线程已经执行完了。泡茶喝的工
作在异步回调方法drinkTea()中执行,执行的线程并不是“泡茶喝”线程,而是烧水线程和清洗线程.

Guava异步回调和Java异步调用的区别

总结一下Guava异步回调和Java的FutureTask异步调用的区别,具体如下:

  1. FutureTask是主动调用的模式,“调用线程”主动获得异步结果,在获取异步结果时处于阻塞状态,并且会一直阻塞,直到拿到
    异步线程的结果。
  2. Guava是异步回调模式,“调用线程”不会主动获得异步结果,而是准备好回调函数,并设置好回调钩子,执行回调函数的并不
    是“调用线程”自身,回调函数的执行者是“被调用线程”,“调用线程”在执行完自己的业务逻辑后就已经结束了,当回调函数被执行
    时,“调用线程”可能已经结束很久了。
比较项 Guava 异步回调 Java FutureTask 异步调用
回调处理 onSuccessonFailure 回调,任务完成自动触发,无需 get() 阻塞 无直接回调,依赖 isDone() 检查,get() 阻塞,手动处理异常
灵活性 可方便组合多个任务,支持结果转换 较基础,多任务组合和结果转换需更多手动编码
使用场景 复杂异步任务,需任务组合与结果转换 简单异步任务,仅执行并获取结果

Netty的异步回调模式

Netty和Guava一样,实现了自己的异步回调体系:Netty继承和扩展了JDK Future系列异步回调的API,定义了自身的Future系列接口和
类,实现了异步任务的监控、异步执行结果的获取。总体来说,Netty对Java Future异步任务的扩展如下:

  • 继承Java的Future接口得到了一个新的属于Netty自己的Future异步任务接口,该接口对原有的接口进行了增强,使得Netty异步任务能够非阻塞地处理回调结果。注意,Netty没有修改Future的名称,只是调整了所在的包名,Netty的Future类的包名和Java的Future接口的包不同。
  • 引入了一个新接口——GenericFutureListener,用于表示异步执行完成的监听器。这个接口和Guava的FutureCallback回调接口不同。Netty使用了监听器的模式,异步任务执行完成后的回调逻辑抽象成了Listener监听器接口。可以将Netty的GenericFutureListener监听器接口加入Netty异步任务Future中,实现对异步任务执行状态的事件监听。

总体来说,在异步非阻塞回调的设计思路上,Netty和Guava是一致的。对应关系为:

  • Netty的Future接口可以对应到Guava的ListenableFuture接口。
  • Netty的GenericFutureListener接口可以对应到Guava的FutureCallback接口。
1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.85.Final</version> <!-- 可以根据需要使用最新的稳定版本 -->
</dependency>

GenericFutureListener接口详解

前面提到,和Guava的FutureCallback一样,Netty新增了一个接口,用来封装异步非阻塞回调的逻辑,那就是GenericFutureListener 接口。GenericFutureListener位于io.netty.util.concurrent包中,源码如下:

1
2
3
4
5
6
7
8
9
package io.netty.util.concurrent;

import java.util.EventListener;

public interface GenericFutureListener<F extends Future<?>> extends EventListener {
//监听器的回调方法
void operationComplete(F var1) throws Exception;
}

GenericFutureListener拥有一个回调方法operationComplete(),表示异步任务操作完成。在Future异步任务执行完成后将回调此方法。大多数情况下,Netty的异步回调代码编写在GenericFutureListener接口的实现类的operationComplete方法中

说明一下,GenericFutureListener的父接口EventListener是一个空接口,没有任何抽象方法,是一个仅仅具有标识作用的接口。

Netty的Future接口详解

Netty也对Java的Future接口进行了扩展,并且名称没有变,还是叫作Future接口,实现在io.netty.util.concurrent包中。和Guava的ListenableFuture`一样,Netty的Future接口扩展了一系列方法,对执行的过程进行监控,对异步回调完成事件进行Listen监听并且回调。

Netty的Future源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package io.netty.util.concurrent;

import java.util.concurrent.TimeUnit;

public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess(); // 判断异步执行是否成功

boolean isCancellable(); // 判断异步执行是否取消

Throwable cause(); // 获取异步任务异常的原因

// 增加异步任务执行完成Listener监听器
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);

Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);

// 移除异步任务执行完成Listener监听器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);

// ...
}

Netty的Future接口一般不会直接使用,使用过程中会使用它的子接口。Netty有一系列子接口,代表不同类型的异步任务,

  • **ChannelFuture**:
    • Channel 操作相关联,当执行 Channel 的异步操作(如 bindconnectwriteclose 等)时会返回该类型的 Future 对象。
    • 重要方法:
      • addListener(GenericFutureListener<? extends Future<? super V>> listener):添加监听器,当操作完成时会调用监听器的 operationComplete 方法,以便处理操作成功或失败的情况。
      • sync():同步等待操作完成,操作失败会抛出异常。
      • await():等待操作完成,不抛出异常,返回操作是否成功。
  • **Promise**:
    • 不仅可以表示异步操作的结果,还可以设置操作的结果,是一个可写的 Future
    • 重要方法:
      • setSuccess(V result):设置操作成功的结果。
      • setFailure(Throwable cause):设置操作失败的原因。
      • trySuccess(V result):尝试设置操作成功结果,若已设置过结果或操作已取消,返回 false
      • tryFailure(Throwable cause):尝试设置操作失败原因,若已设置过结果或操作已取消,返回 false

这些子接口扩展了 Future 接口的基本功能,使其更适合 Netty 的异步编程场景,为网络操作提供了方便的异步结果处理和操作控制机制。使用它们可以更灵活地管理网络操作的结果,避免阻塞线程,提高程序性能和可扩展性。在实际应用中,根据具体的网络操作需求,选择合适的子接口可以更好地实现异步编程和错误处理等功能。例如,在服务器启动和关闭操作中使用 ChannelFuture 处理 bindclose 的结果,在需要手动控制操作结果时使用 Promise 接口。

ChannelFuture的使用

在Netty网络编程中,网络连接通道的输入、输出处理都是异步进行的,都会返回一个ChannelFuture接口的实例。通过返回的异步任务实例可以为其增加异步回调的监听器。在异步任务真正完成后,回调执行。

Netty的网络连接的异步回调实例代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// connect是异步的,仅仅是提交异步任务
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.baidu.com", 80));

// connect的异步任务真正执行完成后,future回调监听器会执行
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Connection established");
} else {
System.err.println("Connection attempt failed");
channelFuture.cause().printStackTrace();
}
}
});

GenericFutureListener接口在Netty中是一个基础类型接口。在网络编程的异步回调中,一般使用Netty中提供的某个子接口,如ChannelFutureListener接口。在上面的代码中,使用到的是这个子接口。

Netty的出站和入站异步回调

Netty的出站和入站操作都是异步的。异步回调的方法和前面Netty建立的异步回调是一样的。下面以经典的NIO出站操作write为例说明ChannelFuture的使用。在write操作调用后,Netty并没有立即完成对Java NIO底层连接的写入操作,底层的写入操作是异步执行的,代码如下:

1
2
3
4
5
6
7
8
9
// write()输出方法,返回的是一个异步任务
ChannelFuture future = ctx.channel().write(msg);
// 为异步任务加上监听器
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
// write操作完成后的回调代码
}
});

在write操作完成后立即返回,返回的是一个ChannelFuture接口的实例。通过这个实例可以绑定异步回调监听器,编写异步回调的逻辑。

EchoServer 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package pers.fulsun._12;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class EchoServer {
private final int port;

public EchoServer(int port) {
this.port = port;
}

public void run() throws Exception {
// 接收连接的 EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 处理连接的 EventLoopGroup
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 添加 String 编解码器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
// 添加 EchoServerHandler 处理客户端发送的数据
ch.pipeline().addLast(new EchoServerHandler());
}
});

// 绑定端口,启动服务器
ChannelFuture f = b.bind(port).sync();

// 等待服务器关闭
f.channel().closeFuture().sync();
} finally {
// 关闭 EventLoopGroup
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new EchoServer(port).run();
}
}

class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 接收到客户端发送的数据,直接回写回去
ctx.writeAndFlush(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 异常处理
cause.printStackTrace();
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package pers.fulsun._12;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;


public class EchoClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});

// 发起连接
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080).sync();

// 发送数据
future.channel().writeAndFlush("Hello, Echo Server");

// 等待连接关闭
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}

class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 接收到服务器返回的数据
System.out.println("Received from server: " + msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}