Netty Review - 探索Pipeline的Inbound和Outbound

文章目录

  • 概念
  • Server Code
  • Client Code
  • InboundHandler和OutboundHandler的执行顺序
    • 在InboundHandler中不触发fire方法
    • InboundHandler和OutboundHandler的执行顺序
    • 如果把OutboundHandler放在InboundHandler的后面,OutboundHandler会执行吗

在这里插入图片描述


概念

在这里插入图片描述

我们知道boss线程监控到绑定端口上有accept事件,此时会为该socket连接实例化Pipeline,并将InboundHandlerOutboundHandler按序加载到Pipeline中,然后将该socket连接(也就是Channel对象)挂载到selector

一个selector对应一个线程,该线程会轮询所有挂载在他身上的socket连接有没有readwrite事件,然后通过线程池去执行Pipeline的业务流

selector如何查询哪些socket连接有readwrite事件,主要取决于调用操作系统的哪种IO多路复用内核

  • 如果是select注意,此处的select是指操作系统内核的select IO多路复用,不是nettyseletor对象),那么将会遍历所有socket连接,依次询问是否有readwrite事件,最终操作系统内核将所有IO事件的socket连接返回给netty进程,当有很多socket连接时,这种方式将会大大降低性能,因为存在大量socket连接的遍历和内核内存的拷贝
  • 如果是epoll,性能将会大幅提升,因为它基于完成端口事件,已经维护好有IO事件的socket连接列表,selector直接取走,无需遍历,也少掉内核内存拷贝带来的性能损耗

Netty中,InboundOutbound是两个重要的概念,用于描述数据在ChannelPipeline中的流动方向。

Inbound(入站)指的是数据从网络传输到应用程序,即数据从远程主机进入本地主机。在ChannelPipeline中,Inbound数据会依次经过Pipeline中的每个ChannelHandler进行处理,直到到达Pipeline的末尾。

Outbound(出站)指的是数据从应用程序传输到网络,即数据从本地主机发送到远程主机。在ChannelPipeline中,Outbound数据会从Pipeline的末尾开始,逆序经过Pipeline中的每个ChannelHandler进行处理,直到到达Pipeline的起始位置。

InboundOutbound的区别在于数据的流动方向。Inbound数据是从网络进入应用程序,而Outbound数据是从应用程序发送到网络。这意味着Inbound数据是应用程序接收和处理外部数据的入口,而Outbound数据是应用程序发送数据到外部的出口。

虽然InboundOutbound描述了数据的不同流动方向,但它们之间也存在联系。在ChannelPipeline中,InboundOutbound数据可以相互影响和交互。例如,一个ChannelHandler可以在处理Inbound数据时生成Outbound数据作为响应,或者在处理Outbound数据时修改Inbound数据的内容。

总结起来,InboundOutbound是描述数据在ChannelPipeline中流动方向的概念。Inbound数据是从网络进入应用程序,Outbound数据是从应用程序发送到网络。它们在ChannelPipeline中相互影响和交互,共同实现网络数据的处理和传输。


Pipeline的责任链是通过ChannelHandlerContext对象串联的,ChannelHandlerContext对象里封装了ChannelHandler对象,通过prev和next节点实现双向链表。Pipeline的首尾节点分别是headtail,当selector轮询到socketread事件时,将会触发Pipeline责任链,从head开始调起第一个InboundHandlerChannelRead事件,接着通过fire方法依次触发Pipeline上的下一个ChannelHandler .

在这里插入图片描述

ChannelHandler分为InbounHandlerOutboundHandler

  • InboundHandler用来处理接收消息
  • OutboundHandler用来处理发送消息。

headChannelHandler既是InboundHandler又是OutboundHandler,无论是read还是write都会经过head,所以head封装了unsafe方法,用来操作socketreadwritetailChannelHandler只是InboundHandlerreadPipleline处理将会最终到达tail


演示之前,我们先附一下代码

Server Code

package com.artisan.pipeline.inout;import com.artisan.pipeline.inout.handler.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class ArtisanEchoServer {private int port;public ArtisanEchoServer(int port) {this.port = port;}private void run() {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new EchoOutboundHandler3());ch.pipeline().addLast(new EchoOutboundHandler2());ch.pipeline().addLast(new EchoOutboundHandler1());ch.pipeline().addLast(new EchoInboundHandler1());ch.pipeline().addLast(new EchoInboundHandler2());ch.pipeline().addLast(new EchoInboundHandler3());}}).option(ChannelOption.SO_BACKLOG, 10000).childOption(ChannelOption.SO_KEEPALIVE, true);System.out.println("EchoServer正在启动...");ChannelFuture channelFuture = serverBootstrap.bind(port).sync();System.out.println("EchoServer绑定端口:" + port);channelFuture.channel().closeFuture().sync();System.out.println("EchoServer已关闭.");} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) {int port = 1234;if (args != null && args.length > 0) {try {port = Integer.parseInt(args[0]);} catch (Exception e) {e.printStackTrace();}}ArtisanEchoServer server = new ArtisanEchoServer(port);server.run();}
}

6个handler演示如下

package com.artisan.pipeline.inout.handler;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoInboundHandler1 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println();System.out.println("进入 EchoInboundHandler1.channelRead");String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);System.out.println("EchoInboundHandler1.channelRead 收到数据:" + data);ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler1] " + data, CharsetUtil.UTF_8));System.out.println("退出 EchoInboundHandler1 channelRead");System.out.println();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("[EchoInboundHandler1.channelReadComplete]");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("[EchoInboundHandler1.exceptionCaught]" + cause.toString());}
}
package com.artisan.pipeline.inout.handler;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println();System.out.println("进入 EchoInboundHandler2.channelRead");String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);System.out.println("EchoInboundHandler2.channelRead 接收到数据:" + data);//ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));ctx.channel().writeAndFlush(Unpooled.copiedBuffer("测试一下channel().writeAndFlush", CharsetUtil.UTF_8));ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " + data, CharsetUtil.UTF_8));System.out.println("退出 EchoInboundHandler2 channelRead");System.out.println();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("[EchoInboundHandler2.channelReadComplete]读取数据完成");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("[EchoInboundHandler2.exceptionCaught]");}
}
package com.artisan.pipeline.inout.handler;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoInboundHandler3 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println();System.out.println("进入 EchoInboundHandler3.channelRead");String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8);System.out.println("EchoInboundHandler3.channelRead 接收到数据:" + data);//ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write] [EchoInboundHandler3] " + data, CharsetUtil.UTF_8));ctx.fireChannelRead(msg);System.out.println("退出 EchoInboundHandler3 channelRead");System.out.println();}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("[EchoInboundHandler3.channelReadComplete]读取数据完成");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("[EchoInboundHandler3.exceptionCaught]");}}
package com.artisan.pipeline.inout.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoOutboundHandler1 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("进入 EchoOutboundHandler1.write");//ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write中的write]", CharsetUtil.UTF_8));
//        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("在OutboundHandler里测试一下channel().writeAndFlush", CharsetUtil.UTF_8));ctx.write(msg);System.out.println("退出 EchoOutboundHandler1.write");}}
package com.artisan.pipeline.inout.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoOutboundHandler2 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("进入 EchoOutboundHandler2.write");//ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write中的write]", CharsetUtil.UTF_8));ctx.write(msg);System.out.println("退出 EchoOutboundHandler2.write");}
}
package com.artisan.pipeline.inout.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EchoOutboundHandler3 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("进入 EchoOutboundHandler3.write");ctx.write(msg);System.out.println("退出 EchoOutboundHandler3.write");}}

Client Code

package com.artisan.netty4.client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;/*** @author 小工匠* @version 1.0* @description: 客户端启动程序* @mark: show me the code , change the world*/
public class ArtisanClient {public static void main(String[] args) throws Exception {NioEventLoopGroup eventExecutors = new NioEventLoopGroup();try {//创建bootstrap对象,配置参数Bootstrap bootstrap = new Bootstrap();//设置线程组bootstrap.group(eventExecutors)//设置客户端的通道实现类型.channel(NioSocketChannel.class)//使用匿名内部类初始化通道.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//添加客户端通道的处理器ch.pipeline().addLast(new ArtisanClientHandler());}});System.out.println("客户端准备就绪");//连接服务端ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 1234).sync();//对通道关闭进行监听channelFuture.channel().closeFuture().sync();} finally {//关闭线程组eventExecutors.shutdownGracefully();}}
}
package com.artisan.netty4.client;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @description: 通用handler,处理I/O事件* @mark: show me the code , change the world*/
@ChannelHandler.Sharable
public class ArtisanClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//发送消息到服务端ctx.writeAndFlush(Unpooled.copiedBuffer("msg send from client 2 server ", CharsetUtil.UTF_8));System.out.println("客户端发消息给服务端结束");System.out.println();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//接收服务端发送过来的消息ByteBuf byteBuf = (ByteBuf) msg;System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));}}

InboundHandler和OutboundHandler的执行顺序

在InboundHandler中不触发fire方法

ArtisanEchoServer#run 中我们先进存在InboundHandler

在这里插入图片描述

在这里插入图片描述

先启动server, 在启动Client,我们测试一下

在这里插入图片描述
在这里插入图片描述

我们可以看到: InboundHandler2没有调用fire事件,InboundHandler3没有被执行

InboundHandler是通过fire事件决定是否要执行下一个InboundHandler,如果InboundHandler没有调用fire事件,那么后续的Pipeline中的Handler将不会执行。

我们来看下源码

在这里插入图片描述


InboundHandler和OutboundHandler的执行顺序

在这里插入图片描述
加入Pipeline的ChannelHandler的顺序如上。

别忘了放开EchoInboundHandler2

 ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " + data, CharsetUtil.UTF_8));

我们来验证下

在这里插入图片描述

执行顺序如上。

InboundHandler1 => InboundHandler2 => OutboundHandler1 => OutboundHander2 => OutboundHandler3 => InboundHandler3

1、InboundHandler是按照Pipleline的加载顺序,顺序执行。

2、OutboundHandler是按照Pipeline的加载顺序,逆序执行。


如果把OutboundHandler放在InboundHandler的后面,OutboundHandler会执行吗

在这里插入图片描述

其中EchoInboundHandler2 先不要给客户端发送数据,先屏蔽掉。

public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("进入 EchoInboundHandler2.channelRead");String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);System.out.println("EchoInboundHandler2.channelRead 接收到数据:" + data);
//        ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8));
//        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("测试一下channel().writeAndFlush", CharsetUtil.UTF_8));ctx.fireChannelRead(Unpooled.copiedBuffer("[ArtisanEchoOutboundHandler2] " + data, CharsetUtil.UTF_8));System.out.println("退出 EchoInboundHandler2 channelRead");}
.......
.......
.......

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/235731.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

指针(2)

函数指针数组 函数指针数组是一个用来存放函数指针&#xff08;地址&#xff09;的数组。 如上图&#xff0c;是将两个函数指针存入数组中。如何写函数指针数组名呢&#xff1f;我们可以先写出函数指针类型int (*)(int,int)然后在(*)里面加上数组名[]即可。 指向函数指针数组…

【Qt绘图】之绘制坦克

使用绘图事件&#xff0c;绘制坦克。 效果 效果很逼真&#xff0c;想象力&#xff0c;有没有。 示例 代码像诗一样优雅&#xff0c;有没有。 包含头文件 #include <QApplication> #include <QWidget> #include <QPainter>绘制坦克类 class TankWidge…

ssm+java车辆售后维护系统 springboot汽车保养养护管理系统+jsp

以前汽车维修人员只是在汽车运输行业中从事后勤保障工作,随着我国经济的发展,汽车维修行业已经从原来的从属部门发展成了如今的功能齐备的独立企业。这种结构的转变,给私营汽修企业和个体汽修企业的发展带来了契机,私营企业和个体维修企业的加入也带动了整个汽修行业的整体水平…

线性回归 调试方法

调试方法 特征缩放 对于某些不具有比较性的样本特征 x i x_i xi​ &#xff08;比如对其他的x来说 x i x_i xi​ 相当大或者相当小&#xff09;&#xff0c;梯度下降的过程可能会非常漫长&#xff0c;并且可能来回波动才能最后收敛到全局的最小值。 在这样的情况下&#xff…

python中操作excel的常用库和方法

无论办公自动化或者数据分析中&#xff0c;我们常会用到excel表格。在python中都有哪些库处理数据表格&#xff1f;下面就说明一下在python中有哪些库能够处理数据表格。 xlwt库 pip install xlwtxlwt库仅仅能向excel中写入数据&#xff0c;流程如下&#xff1a; 创建一个wo…

嵌入式Linux:配置Ubuntu系统环境和安装开发工具

目录 1、配置Ubuntu系统环境 1.1、APT下载工具 1.2、更新语言 1.3、更新本地数据库 1.4、安装VIM编辑器 1.5、Ubuntu 和 Windows 文件互传 1.6、开启NFS服务 1.7、开启SSH服务 2、安装开发工具 2.1、Ubuntu安装VSCode 2.2、Windows安装MobaXterm 安装好Ubuntu系统环…

排序分析(Ordination analysis)及R实现

在生态学、统计学和生物学等领域&#xff0c;排序分析是一种用于探索和展示数据结构的多元统计技术。这种分析方法通过将多维数据集中的样本或变量映射到低维空间&#xff0c;以便更容易理解和可视化数据之间的关系。排序分析常用于研究物种组成、生态系统结构等生态学和生物学…

【Openstack Train安装】十二、Cinder安装

Cinder在块存储资源和计算服务&#xff08;Nova&#xff09;之间提供了一个抽象层。通过Cinder API&#xff0c;块存储可以被管理&#xff08;创建、销毁和分配等&#xff09;&#xff0c;而不需要知道提供存储的底层资源。 本文介绍Cinder安装步骤&#xff0c;Cinder需在控制节…

苹果TF签名全称TestFlight签名,需要怎么做才可以上架呢?

如果你正在开发一个iOS应用并准备进行内测&#xff0c;TestFlight是苹果提供的一个免费的解决方案&#xff0c;它使开发者可以邀请用户参加应用的测试。以下是一步步的指南&#xff0c;教你如何利用TestFlight进行内测以便于应用后续可以顺利上架App Store。 1: 准备工作 在测…

怎么一键批量转换PDF/图片为Excel、Word,从而提高工作效率?

在处理大量PDF、图片文件时&#xff0c;我们往往需要将这些文件转换成Word或Excel格式以方便编辑和统计分析。此时&#xff0c;金鸣表格文字识别大师这款工具可以发挥巨大作用。下面&#xff0c;我们就来探讨如何使用它进行批量转换&#xff0c;以实现高效处理。 一、准备工作…

初识elasticsearch

文章目录 一、前言二、了解ES2.1 elasticsearch的作用2.2 ELK技术栈2.3 elasticsearch和lucene2.4 为什么不是其他搜索技术2.5 总结 三、倒排索引3.1 正向索引3.2 倒排索引3.3 正向和倒排 四、es的一些概念4.1 文档和字段4.2 索引和映射4.3 mysql和elasticsearch 一、前言 前一…

StartRocks 连接 Paimon外部表

版本 StartRocksPaimon3.2.00.5 sr 环境准备 CREATE external CATALOG paimon_hdfs PROPERTIES ("type" "paimon",paimon.catalog.type filesystem,"paimon.catalog.warehouse" "hdfs://hadoop03:9000/paimon/test" );mysql> …