netty之pipeline

Netty抽象出流水线(pipeline)这一层数据结构进行处理或拦截channel相关事件。

事件分为入站事件(inBound event)和出站事件(outBound event)的ChannelHandlers列表。ChannelPipeline使用先进的Intercepting Filter模式,使用户可以完全控制如何处理事件以及管道中的ChannelHandlers如何相互交互。类似与servlet的filter,亦或是Linux的管道命令,使用责任链模式。将不同的handlers组装成一个链表,进行依次调用。开发人员可以很方便的往链表添加或删除handler进行自己的业务逻辑操作。
在这里插入图片描述

channel事件分为入站和出栈事件两种类型,同样的handler分为ChannelInboundHandler和ChannelOutboundHandler两种类型处理对应事件。入站事件对应read事件,出栈事件对应write事件,读和写公用同一个pipeline,两个事件类型互不干涉。read事件从head开始往后找相关的inBound handlers依次进行处理,write事件从tail往前找相关的outBound handlers依次进行处理。

pipeline的创建

当channel创建时,就会创建一个pipeline与之进行绑定。在channel整个生命周期都会使用该pipeline进行事件处理,默认使用DefaultChannelPipeline类进行创建pipeline。

来看下DefaultChannelPipeline的构造方法:

protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise =  new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;
}

创建时候会初始化head和tail。我们在往pipeline添加的时候都是添加的handler,这里会包装成AbstractChannelHandlerContext类型添加到链表里。

添加handler

添加handler可以通过pipeline.addLast方法进行添加

 ChannelPipeline p = ...;p.addLast("1", new InboundHandlerA());p.addLast("2", new InboundHandlerB());p.addLast("3", new OutboundHandlerA());p.addLast("4", new OutboundHandlerB());p.addLast("5", new InboundOutboundHandlerX());

addLast的源码如下:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {//判断handler能否被添加多次checkMultiplicity(handler);//创建一个DefaultChannelHandlerContext实例,将handler包装起来newCtx = newContext(group, filterName(name, handler), handler);//添加到链表addLast0(newCtx);//判断channel是否绑定了eventLoop,没绑定调用callHandlerCallbackLater后面文章会讲到这里if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;}}//回调当前handler的handlerAdded方法callHandlerAdded0(newCtx);return this;
}
//这里就是将handler添加到tail的前面private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}

从上面源码可以看出,添加的handler首先会被包装成一个HandlerContext,然后在将其放到链表tail的前面。添加handler其它的方法还有addFirst,addBefore,addAfter原理类似。可以根据业务逻辑进行handler顺序编排。

常见事件

入站读事件:ChannelInboundHandler

回调方法事件说明
channelRegisteredchannel绑定EventLoop
channelUnregisteredchannel取消EventLoop绑定
channelActivechannel启动准备完成
channelInactivechannel处于非活动状态,准备关闭
channelRead从对端读取到数据
channelReadComplete处理完所有读取到的数据

出站写事件:ChannelOutboundHandler

回调方法事件说明
bindbind操作完成前回调,serverchannel事件
connectconnect操作完成前回调,clientchannel事件
disconnectdisconnect操作完成前回调,client端事件
closeclose操作前回调,server端事件
readread操作前回调
writewrite操作前回调
flushflush操作前回调

数据读写源码

io.netty.channel.Channel.write方法会调用pipeline.write,其源码如下

public final ChannelFuture write(Object msg) {return tail.write(msg);
}

这里看到写数据从tail开始调用。所以这里如果有多个handler的话,要注意我们加入handler的顺序。

手动添加的handler都被包装成DefaultChannelHandlerContext,该类只重写了handler()方法获取当前handler,其它方法实现都在其父类AbstractChannelHandlerContext中。方法间传递的ctx变量就是该类。递归调用handler也是主要在该类中方法实现:

ctx.write:

private void write(Object msg, boolean flush, ChannelPromise promise) {//找出下一个Outbound类型MASK_WRITE类型的handlerfinal AbstractChannelHandlerContext next = findContextOutbound(flush ?(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);final Object m = pipeline.touch(msg, next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {//是否flush,调用下一个handler的writeif (flush) {next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {final WriteTask task = WriteTask.newInstance(next, m, promise, flush);if (!safeExecute(executor, task, promise, m, !flush)) {task.cancel();}}
}

这里以读方法为例看调用过程

读数据从pipeline.fireChannelRead方法开始

public final ChannelPipeline fireChannelRead(Object msg) {AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this;
}

这里看到ctx从head开始往后找。然后调用ctx.invokeChannelRead

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}
}

invokeChannelRead方法其实是调起handler.channelRead方法。

private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {invokeExceptionCaught(t);}} else {fireChannelRead(msg);}
}

下一个handler一般在处理完后,会调用ctx.fireChannelRead完成下一个handler的调用。这样完成链式调用

public ChannelHandlerContext fireChannelRead(final Object msg) {invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);return this;
}

参考:
https://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html
https://www.alibabacloud.com/blog/essential-technologies-for-java-developers-io-and-netty_597367

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/112551.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【AD操作】【原理图标注配置器】批量更新原理图的元件标签

原理图标注配置器 快捷键 T-A-A 1.调整编号顺序 2.设置起始编号 3.更新 和 执行变更 对 学习笔记,供自己复习参考。

物理内存分配

目录 内核物理内存分配接口 内存分配行为(物理上) 内存分配的行为操作 内存 三个水位线 水线计算 水位线影响内存分配行为 内存分配核心__alloc_pages 释放页 1、内核物理内存分配接口 struct page *alloc_pages(gfp_t gfp, unsigned int ord…

子网的划分

强化计算机网络发现王道没有这一块的内容,导致做题稀里糊涂。于是个人调研补充。 子网划分是将一个大型IP网络划分成更小的子网,以实现更有效的网络管理和资源分配。 原因: 提高网络性能:子网划分可以减少广播域的大小&#xff…

【Linux旅行记】探究操作系统是如何进行管理的!

文章目录 什么是操作系统?操作系统概念操作系统的目的底层硬件驱动程序操作系统理解系统调用接口 操作系统是如何进行管理的?什么是管理?操作系统是如何管理硬件信息呢? 🍀小结🍀 🎉博客主页&am…

x86平台运行arm64平台docker 镜像

本文介绍在x86服务器上安装qemu-aarch64-statick仿真器,以实现x86服务器可以运行docker或docker-compose镜像。 报错信息: x86服务器默认不能运行ARM平台镜像,会提示如下错误: WARNING: The requested images platform (linux/ar…

2023最新如何轻松升级、安装和试用Navicat Premium 16.2.10 教程详解

🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…

github 网页显示不全?

问题 解决 1、检查网页,打开 network,重新刷新 github 网页 2、查看无法加载的资源(如 css 文件) 3、查看域名地址 https://tool.chinaz.com/dns/,github.githubassets.com(检查网页元素,点击无…

嵌入式学习攻略

嵌入式软件编程的基础 主要是学习编程语言、开发环境和形成自己的编程逻辑,为嵌入式软件开发打下良好的基础,编程语言建议为C和C语言。书籍中的例子都是比较经典的程序实例,尽量去搞懂,不要觉得太长或者太难了而放弃,…

【计算机基础】Git系列2:配置多个SSH

📢:如果你也对机器人、人工智能感兴趣,看来我们志同道合✨ 📢:不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 📢:文章若有幸对你有帮助,可点赞 👍…

eNSP网络学习

一、eNSP 1.什么是eNSP eNSP(Enterprise Network Simulation Platform)是一款由华为提供的免费的、可扩展的、图形化操作的网络仿真工具平台,主要对企业网络路由器、交换机进行软件仿真,完美呈现真实设备实景,支持大型网络模拟,让…

statistic learning outlook

supervised learning 贝叶斯估计 决策树与信息熵 信息熵 H ( D ) − ∑ i 1 n p ( X x i ) l o g ( P ( X x i ) ) − ∑ p i l o g ( p i ) H(D)-\sum_{i1}^n p(Xx_i)log(P(Xx_i))-\sum p_ilog(p_i) H(D)−∑i1n​p(Xxi​)log(P(Xxi​))−∑pi​log(pi​),信…

多线程中的Semaphore信号量

在Java多线程编程中,Semaphore是一种用于控制资源访问的机制。Semaphore允许您限制同时访问某个资源的线程数量。这在需要限制并发访问的情况下非常有用,例如数据库连接池或有限数量的线程池。 创建Semaphore 要使用Semaphore,首先需要创建…