Netty ChannelFuture 异步监听
1. 前言
本节主要讲解 ChannelFuture ,它的作用是用来保存 Channel 异步操作的结果,可以看作是一个异步操作结果的占位符。
2. 概念
在 Netty 中所有的 IO 操作都是异步的,不能立刻得到 IO 操作的执行结果,但是可以通过注册一个监听器来监听其执行结果。在 Java 的并发编程当中可以通过 Future 来进行异步结果的监听,但是在 Netty 当中是通过 ChannelFuture 来实现异步结果的监听。通过注册一个监听的方式进行监听,当操作执行成功或者失败时监听会自动触发注册的监听事件。
3. 应用场景
ChannelFture 在开发当中经常需要用到,可以用来监听客户端连接服务端的结果反馈,Netty 是异步操作,无法知道什么时候执行完成,因此可以通过 ChannelFuture 来进行执行结果的监听。在 Netty 当中 Bind 、Write 、Connect 等操作会简单的返回一个 ChannelFuture。
4. 核心方法
| 序号 | 方法 | 描述 | 
|---|---|---|
| 1 | addListener | 注册监听器,当操作已完成 (isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则通知指定的监听器 | 
| 2 | removeListener | 移除监听器 | 
| 3 | sync | 等待异步操作执行完毕 | 
| 4 | await | 等待异步操作执行完毕 | 
| 5 | isDone | 判断当前操作是否完成 | 
| 6 | isSuccess | 判断已完成的当前操作是否成功 | 
| 7 | isCancellable | 判断已完成的当前操作是否被取消 | 
| 8 | cause | 获取已完成的当前操作失败的原因 | 
sync () 和 await () 都是等待异步操作执行完成,那么它们有什么区别呢?
- sync () 会抛出异常,建议使用 sync ();
 - await () 不会抛出异常,主线程无法捕捉子线程执行抛出的异常。
 
5. 深入了解 ChannelFuture
5.1 生命周期说明
Future 可以通过四个核心方法来判断任务的执行情况。
| 状态 | 说明 | 
|---|---|
| isDone() | 任务是否执行完成,无论成功还是失败 | 
| isSuccess() | 任务是否执行采购 | 
| isCancelled() | 任务是否被取消 | 
| cause() | 获取执行异常信息 | 
执行过程状态的改变说明
当一个异步任务操作开始的时候,一个新的 future 对象就会被创建。在开始的时候该 future 是处于未完成的状态,也就是说,isDone ()=false、isSuccess ()=false、isCancelled ()=false;只要该任务中任何一种状态结束了,无论是说成功、失败、或者被取消,那么整个 Future 就会被标记为 已完成 。注意的是,如果执行失败那么 cause () 方法会返回异常信息的内容。

实例:
ChannelFuture channelFuture=bootstrap.connect("127.0.0.1",80);
channelFuture.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        if(future.isDone()){
            if(future.isSuccess()){
                System.out.println("执行成功...");
            }else if(future.isCancelled()){
                System.out.println("任务被取消...");
            }else if(future.cause()!=null){
                System.out.println("执行出错:"+future.cause().getMessage());
            }
        }
    }
});
5.2 ChannelFuture 父接口说明
ChannelFuture 的类继承结构,具体如下所示:
public interface ChannelFuture extends Future<Void> {
}
public interface Future<V> extends java.util.concurrent.Future<V> {
}
通过上面的继承关系,我们可以清晰的知道 ChannelFuture 其实最顶层的接口是来自 java 并发包的 Future,java 并发包下的 Future 需要手工检查执行结果是否已经完成,非常的繁琐,因此 Netty 把它进行了封装和完善,变成了自动的监听,用起来变的非常的简单。
java 并发包下的 Future 主要存在以下几个缺陷:
- 只允许手动通过 get () 来检查对应的操作是否已经完成,它是堵塞直到子线程完成执行并且返回结果;
 - 只有 isDone () 方法判断一个异步操作是否完成,但是对于完成的定义过于模糊,JDK 文档指出正常终止、抛出异常、用户取消都会使 isDone () 方法返回真。并不能很好的区分到底是哪种状态。
 
get () 方法是堵塞的,必须等待 子线程 执行完成才能往下执行。
实例:
//1.定义一个子线程,实现 Callable 接口
public class ThreadTest implements Callable<Integer>{
    @Override
    public Integer call(){
        //打印
        System.out.println(">>>>>>>>子线程休眠之前");
        //休眠5秒
        Thread.sleep(5000);
        //打印
        System.out.println(">>>>>>>>子线程休眠之后");
        return 1;
    }
}
//2.调用子线程处理
public static void main(String[] args){
    ThreadTest t=new ThreadTest();
    FutureTask<Integer> future=new FutureTask<Integer>(t);
    //2.1.开始执行子线程
    new Thread(future).start();
    //2.2.手工返回结果
    int result=future.get();
    System.out.println(">>>>>>>>执行结果:"+result);
    //2.3.操作数据库
    userDao.updateStatus("1");
}
执行结果:
>>>>>>>>子线程休眠之前
>>>>>>>>子线程休眠之后
>>>>>>>>执行结果:1
结论总结:
- 说明了 Java 并发包的 Future 要想获取异步执行结果,必须手工调用 get () 方法,此时虽然能获取执行结果,但是无法知道执行结果是成功还是失败;
 - 使用 get () 获取执行结果,但是 get () 后面的业务则被堵塞,直到后面执行完毕才会往下执行,失去了异步操作提高执行效率的意义了。
 
6. ChannelFuture 原理
6.1 线程堵塞
思考:sync () 和 await () 方法如何同步等待执行完成并获取执行结果的呢?
源码分析如下所示:
private short waiters;//计数器
@Override
public Promise<V> await() throws InterruptedException {
    //1.判断是否执行完成,如果执行完成则返回
    if (isDone()) {
        return this;
    }
    //2.线程是否已经中断,如果中断则抛异常
    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }
    //3.检查死锁
    checkDeadLock();
    //4.同步代码块->while循环不断的监听执行结果
    synchronized (this) {
        while (!isDone()) {
            incWaiters();//waiters递增
            try {
                wait();//JDK 的 Object 方法,线程等待【核心】
            } finally {
                decWaiters();//waiters 递减
            }
        }
    }
    return this;
}
//递增函数
private void incWaiters() {
    if (waiters == Short.MAX_VALUE) {
        throw new IllegalStateException("too many waiters: " + this);
    }
    ++waiters;
}
//递减函数
private void decWaiters() {
    --waiters;
}
通过以上代码,我们发现 await () 的核心其实就是调用 Object 的 wait () 方法进行线程休眠,普通的 Java 多线程知识点。
6.2 线程唤醒
思考:当前线程休眠了,那么什么时候进行唤醒呢?
源码分析如下所示:
@Override
public Promise<V> setSuccess(V result) {
    //1.setSuccess0 赋值操作
    if (setSuccess0(result)) {
        //2.通知执行监听器
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
    //继续进入方法
    return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        //继续进入方法
        checkNotifyWaiters();
        return true;
    }
    return false;
}
private synchronized void checkNotifyWaiters() {
    if (waiters > 0) {
        //核心:唤醒之前休眠的线程
        notifyAll();
    }
}
源码分析总结:
- 堵塞的核心是通过 Object.wait () 方法进行休眠当前线程,普通的 Java 多线程知识;
 - 执行完成之后给不同状态(setSuccess、setFailure)赋值的时候唤醒休眠的线程;
 - 唤醒线程之后调用监听器的方法 
l.operationComplete(future); 
7. 小结
通过本节的学习,我们需要掌握以下几个核心知识点:
- 掌握异步的概念,传统 I/O 是同步堵塞的,执行 I/O 操作后线程会被阻塞住,直到操作完成;异步处理的好处是不会造成线程阻塞,可以通过 Future 来监听异步执行的结果;
 - ChannelFuture 的几种状态,以及它的值变化时机;
 - ChannelFuture 的堵塞和唤醒源码分析。
 
访问者可将本网站提供的内容或服务用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯本网站及相关权利人的合法权利。
本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站,邮箱:80764001@qq.com,予以删除。
