ChannelPipeline 调度 handler 的源码剖析
- 源码剖析目的
- 当一个请求进来的时候,ChannelPipeline 是如何调用内部的这些 handler 的
- 首先,当一个请求进来的时候,会第一个调用 pipeline 的 相关方法,如果是入站事件,这些方法由 fire 开头表示开始管道的流动。让后面的 handler 继续处理
- 源码剖析
- 说明:当浏览器输入 http://localhost:8007。可以看到会执行 handler在 Debug 时,可以将断点下在 DefaultChannelPipeline 类的
@Override
public final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(head);return this;
}
源码分析
DefaultChannelPipeline 是如何实现这些 fire 方法的
DefaultChannelPipeline 源码
public class DefaultChannelPipeline implements ChannelPipeline {@Overridepublic final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(head);return this;}@Overridepublic final ChannelPipeline fireChannelInactive() {AbstractChannelHandlerContext.invokeChannelInactive(head);return this;}@Overridepublic final ChannelPipeline fireExceptionCaught(Throwable cause) {AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);return this;}@Overridepublic final ChannelPipeline fireUserEventTriggered(Object event) {AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);return this;}@Overridepublic final ChannelPipeline fireChannelRead(Object msg) {AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;}@Overridepublic final ChannelPipeline fireChannelReadComplete() {AbstractChannelHandlerContext.invokeChannelReadComplete(head);return this;}@Overridepublic final ChannelPipeline fireChannelWritabilityChanged() {AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);return this;}
}
说明:
可以看出来,这些方法都是 inbound 的方法,也就是入站事件,调用静态方法传入的也是 mbound 的类型 headhandler。这些静态方法则会调用 head 的 ChannelInboundImvoker 接口的方法,再然后调用 handler 的真正方法
public class DefaultChannelPipeline implements ChannelPipeline {@Overridepublic final ChannelFuture bind(SocketAddress localAddress) {return tail.bind(localAddress);}@Overridepublic final ChannelFuture connect(SocketAddress remoteAddress) {return tail.connect(remoteAddress);}@Overridepublic final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {return tail.connect(remoteAddress, localAddress);}@Overridepublic final ChannelFuture disconnect() {return tail.disconnect();}@Overridepublic final ChannelFuture close() {return tail.close();}@Overridepublic final ChannelFuture deregister() {return tail.deregister();}@Overridepublic final ChannelPipeline flush() {tail.flush();return this;}@Overridepublic final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}@Overridepublic final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {return tail.connect(remoteAddress, promise);}@Overridepublic final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {return tail.connect(remoteAddress, localAddress, promise);}@Overridepublic final ChannelFuture disconnect(ChannelPromise promise) {return tail.disconnect(promise);}
}
说明:
1) 这些都是出站的实现,但是调用的是 outbound 类型的 tail handler 来进行处理,因为这些都是 outbound 事件
2)出站是 tail 开始,入站从 head 开始。因为出站是从内部向外面写,从tail 开始,能够让前面的 handler 进行处理,防止 handler 被遗漏,比如编码。反之,入站当然是从 head 往内部输入,让后面的 handler 能够处理这些输入的数据。比如解码。因此虽然 head 也实现了 outbound 接口,但不是从 head 开始执行出站任务
3)关于如何调度,用一张图来表示
说明:
1) pipeline 首先会调用 Context 的静态方法 fireXXX,并传入 Context
2)然后,静态方法调用 Context 的 invoker 方法,而 invoker 方法内部会调用该 Context 所包含的Handler 的真正的 XXX 方法,调用结束后,如果还需要继续向后传递,就调用 Context 的 fireXXX2 方法,循环往复。