Netty Review - 客户端流程源码解析

文章目录

  • Pre
  • Netty Client Code
  • Netty 客户端创建流程
  • 源码分析
    • 入口
    • 客户端建立连接
      • NioMessageUnsafe#read 处理 OP_ACCEPT
    • 客户端发送数据
      • NioByteUnsafe#read 处理 OP_READ
  • 源码图

在这里插入图片描述

在这里插入图片描述


Pre

Netty Review - ServerBootstrap源码解析

Netty Review - NioServerSocketChannel源码分析

Netty Review - 服务端channel注册流程源码解析


Netty Client Code

Netty客户端的创建流程通常涉及以下步骤:

  1. 创建Bootstrap实例:使用Bootstrap类创建一个Netty客户端引导程序实例。Bootstrap负责配置和启动Netty客户端。

  2. 设置EventLoopGroup:为客户端引导程序指定一个EventLoopGroup。EventLoopGroup是一组处理I/O操作的线程池,通常包含一个用于处理连接的boss线程池和一个用于处理I/O事件的worker线程池。

  3. 指定Channel类型:通过指定Channel的实现类或提供一个Channel工厂来指定客户端将要使用的Channel类型。不同的Channel类型对应着不同的传输协议,如NIO、Epoll、KQueue等。

  4. 配置Channel选项:通过调用Bootstrap的option()方法来配置客户端Channel的选项,如TCP连接的参数、Socket参数等。

  5. 设置Channel处理器:调用Bootstrap的handler()方法设置ChannelPipeline中的ChannelHandler。ChannelHandler用于处理入站和出站事件,比如编解码、数据处理、日志等。

  6. 连接到服务器:调用Bootstrap的connect()方法连接到服务器。此时,客户端会尝试连接到指定的远程服务器,并返回一个ChannelFuture对象,用于异步等待连接的建立。

  7. 处理连接结果:通过ChannelFuture对象的addListener()方法添加一个监听器,监听连接操作的结果。一旦连接建立成功或失败,将执行相应的操作。


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class MyClient {public static void main(String[] args)  throws  Exception{EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new MyMessageEncoder());pipeline.addLast(new MyClientHandler());}});System.out.println("netty client start。。");ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();channelFuture.channel().closeFuture().sync();}finally {group.shutdownGracefully();}}
}

在这个示例中,我们创建了一个NIO的EventLoopGroup,使用NioSocketChannel作为客户端的Channel类型,设置了TCP连接的保持活动选项,并初始化ChannelPipeline。最后,通过connect()方法连接到远程服务器,并启动客户端。


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;public class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {for(int i = 0; i< 2; i++) {String msg = "你好,我是artisan";//创建协议包对象MyMessageProtocol messageProtocol = new MyMessageProtocol();messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));ctx.writeAndFlush(messageProtocol);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}

Netty 客户端创建流程

在这里插入图片描述


源码分析

入口

在这里插入图片描述


客户端建立连接

当客户端连接时,服务器端会监听到一个 OP_ACCEPT 事件。这是由于服务器端的 NIO 通道(通常是 ServerSocketChannel)在接受客户端连接时,会触发 OP_ACCEPT 事件。这个事件通知服务器端,有一个新的连接已经准备好接受。

在 Netty 中,当服务器端监听到 OP_ACCEPT 事件时,会执行相应的处理逻辑。通常情况下,服务器端会执行以下步骤:

  1. 获取到服务器端的 Selector 对象。
  2. 通过 Selector 获取到发生事件的 SelectionKey
  3. SelectionKey 中获取到对应的通道(ServerSocketChannel)。
  4. 调用 ServerSocketChannelaccept() 方法,接受客户端的连接,返回一个新的 SocketChannel 对象,表示与客户端建立的连接。
  5. 将新建立的 SocketChannel 注册到 Selector 上,并注册 OP_READ 事件,以便读取客户端发送的数据。
  6. 处理客户端连接成功的逻辑,如记录日志、发送欢迎消息等。

这样,服务器端就能够接受客户端的连接,并与之建立通信。


NioMessageUnsafe#read 处理 OP_ACCEPT

这段代码是 Netty 中用于处理读取数据的方法。以下是对代码的中文注释:

@Override
public void read() {assert eventLoop().inEventLoop(); // 断言当前线程处于事件循环中// 获取通道配置和管道final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();// 获取用于分配接收字节缓冲区的句柄final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config); // 重置句柄状态boolean closed = false; // 标志是否已关闭连接Throwable exception = null; // 异常信息try {try {do {int localRead = doReadMessages(readBuf); // 读取消息到缓冲区if (localRead == 0) { // 如果未读取到任何数据break;}if (localRead < 0) { // 如果读取到的数据小于0,表示连接已关闭closed = true;break;}allocHandle.incMessagesRead(localRead); // 增加读取的消息数量} while (allocHandle.continueReading()); // 循环读取数据,直到不再需要读取或者没有更多的数据} catch (Throwable t) {exception = t; // 记录异常信息}int size = readBuf.size(); // 获取读取到的数据数量for (int i = 0; i < size; i++) { // 遍历读取到的数据readPending = false; // 标记为未挂起状态pipeline.fireChannelRead(readBuf.get(i)); // 将读取到的数据传递给管道的下一个处理器}readBuf.clear(); // 清空读取缓冲区allocHandle.readComplete(); // 表示读取操作完成pipeline.fireChannelReadComplete(); // 通知管道读取操作已完成if (exception != null) { // 如果有异常发生closed = closeOnReadError(exception); // 根据异常情况判断是否需要关闭连接pipeline.fireExceptionCaught(exception); // 将异常信息传递给管道的异常处理器}if (closed) { // 如果连接已关闭inputShutdown = true; // 标记为输入流已关闭if (isOpen()) { // 如果通道仍然打开close(voidPromise()); // 关闭通道}}} finally {// 检查是否有未处理的读取挂起,可能的原因是:// * 用户在 channelRead(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()// * 用户在 channelReadComplete(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()// 详情参考:https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) { // 如果没有挂起的读取,并且未开启自动读取removeReadOp(); // 移除读取操作}}
}

这段代码负责从通道中读取数据,并将读取到的数据传递给管道中的下一个处理器。在读取数据的过程中,会处理可能发生的异常,并根据需要关闭连接。同时,还会处理是否需要继续读取数据,以及是否需要移除读取操作。


doReadMessages(readBuf)

io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages

@Override
protected int doReadMessages(List<Object> buf) throws Exception {// 从底层 SocketChannel 接受新连接SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) { // 如果成功接受到新连接// 创建一个新的 NioSocketChannel 实例,并添加到List中buf.add(new NioSocketChannel(this, ch));return 1; // 返回成功接受到一个连接}} catch (Throwable t) {// 处理异常logger.warn("Failed to create a new channel from an accepted socket.", t);try {// 尝试关闭 SocketChannelch.close();} catch (Throwable t2) {// 处理关闭异常logger.warn("Failed to close a socket.", t2);}}return 0; // 返回没有接受到新连接
}

new NioSocketChannel(this, ch)

 public NioSocketChannel(Channel parent, SocketChannel socket) {super(parent, socket);config = new NioSocketChannelConfig(this, socket.socket());}

接下来这段代码是 AbstractNioByteChannel 的构造函数,它调用了父类 AbstractNioChannel 的构造函数,并指定了感兴趣的事件为 SelectionKey.OP_READ,表示该通道对读取事件感兴趣。让我们逐行解释:

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {// 调用父类 AbstractNioChannel 的构造函数super(parent, ch, SelectionKey.OP_READ);
}

这个构造函数的作用是初始化 AbstractNioByteChannel,它接受一个 Channel 父类和一个 java.nio.channels.SelectableChannel 对象作为参数。在构造函数中,通过调用父类的构造函数,将 SelectableChannel 注册到了父类的 selector 中,并指定了对读取事件感兴趣。

剩下的逻辑如下

在这里插入图片描述

pipeline.fireChannelRead(readBuf.get(i))
在这里插入图片描述


客户端发送数据

当客户端向服务器端发送数据时,会触发 OP_READ 事件。这是由于服务器端的 NIO 通道在接收到客户端发送的数据时,会触发 OP_READ 事件。这个事件通知服务器端,有数据可读取。

在 Netty 中,当服务器端监听到 OP_READ 事件时,会执行相应的处理逻辑。通常情况下,服务器端会执行以下步骤:

  1. 获取到服务器端的 Selector 对象。
  2. 通过 Selector 获取到发生事件的 SelectionKey
  3. SelectionKey 中获取到对应的通道(SocketChannel)。
  4. SocketChannel 中读取数据,并将数据存储到缓冲区中。
  5. 处理从客户端接收到的数据,执行相应的业务逻辑,如解析请求、处理消息等。
  6. 如有必要,向客户端发送响应消息。

这样,服务器端就能够接收客户端发送的数据,并根据业务逻辑进行处理。

NioByteUnsafe#read 处理 OP_READ

这段代码是 Netty 中用于读取数据的方法。让我们逐行解释:

@Override
public final void read() {final ChannelConfig config = config(); // 获取通道配置if (shouldBreakReadReady(config)) { // 检查是否应该中断读取就绪状态clearReadPending(); // 清除读取挂起标志return;}final ChannelPipeline pipeline = pipeline(); // 获取通道管道final ByteBufAllocator allocator = config.getAllocator(); // 获取字节缓冲区分配器final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); // 获取接收缓冲区分配器的句柄allocHandle.reset(config); // 重置分配器ByteBuf byteBuf = null; // 字节缓冲区对象boolean close = false; // 是否关闭通道try {do {byteBuf = allocHandle.allocate(allocator); // 分配字节缓冲区allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 读取字节到缓冲区if (allocHandle.lastBytesRead() <= 0) { // 检查是否有字节被读取// 如果没有读取到任何字节,释放缓冲区byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0; // 检查是否需要关闭通道if (close) { // 如果需要关闭通道// 因为收到了 EOF,所以没有剩余可读取的数据。readPending = false; // 标记读取完成}break; // 跳出循环}allocHandle.incMessagesRead(1); // 增加已读消息数readPending = false; // 标记读取完成pipeline.fireChannelRead(byteBuf); // 将读取到的字节缓冲区传递给通道的处理器链byteBuf = null;} while (allocHandle.continueReading()); // 继续读取直到满足条件allocHandle.readComplete(); // 读取完成pipeline.fireChannelReadComplete(); // 通知通道读取完成if (close) { // 如果需要关闭通道closeOnRead(pipeline); // 关闭通道}} catch (Throwable t) {// 处理读取过程中的异常handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {// 检查是否有未处理的读取挂起if (!readPending && !config.isAutoRead()) {removeReadOp(); // 移除读取操作}}
}

从通道中读取数据,并将读取到的数据传递给通道的处理器链进行处理。在读取过程中可能会出现异常,需要进行相应的处理。最后,根据读取的结果来判断是否需要关闭通道。

里面的主要逻辑如下

在这里插入图片描述


源码图

在这里插入图片描述
图都给你画好了,戳这里

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/474422.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

由于找不到MSVCP140.dll无法运行软件游戏,多种解决方法分享

电脑系统在运行过程中&#xff0c;当出现“由于找不到MSVCP140.dll”这一提示时&#xff0c;可能会引发一系列潜在的问题与影响。当电脑无法找到这个特定的dll文件时&#xff0c;意味着相关应用可能无法顺利加载并执行必要的组件&#xff0c;进而导致程序无法启动或运行过程中频…

Python中超超超高颜值的库,我刚发现的...

在Python中&#xff0c;有一个名为rich的宝藏包&#xff0c;它能够将你的终端输出变成一场视觉盛宴。rich是一个用于在终端中呈现富文本&#xff08;包括颜色、样式、表格、进度条等&#xff09;的Python库&#xff0c;它可以使你的命令行界面变得生动而富有表现力。 如何安装 …

上位机图像处理和嵌入式模块部署(Halcon借鉴与客户学习)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 对于很多学院派的同学来说&#xff0c;他们对市场的感觉一般是比较弱的。如果写一个软件的话&#xff0c;或者说开发一个项目的话&#xff0c;他们…

基于SFLA算法的神经网络优化matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 4.1 SFLA的基本原理 4.2 神经网络优化 5.完整程序 1.程序功能描述 基于SFLA算法的神经网络优化。通过混合蛙跳算法&#xff0c;对神经网络的训练进行优化&#xff0c;优化目标位神经网络的…

Maven配置

目录 非Maven项目的缺点MavenMaven的仓库Maven的资源坐标Maven的下载安装Maven常用配置本地仓库镜像仓库配置JDK 非Maven项目的缺点 问题一&#xff1a; 项目中的jar包资源需要自己在网上下载&#xff0c;手动导入&#xff0c;不好管理。问题二&#xff1a; jar包版本控制麻烦…

代码随想录刷题笔记 DAY 28 | 复原 IP 地址 No.93 | 子集 No.78 | 子集 II No.90

文章目录 Day 2801. 复原 IP 地址&#xff08;No. 93&#xff09;1.1 题目1.2 笔记1.3 代码 02. 子集&#xff08;No. 78&#xff09;2.1 题目2.2 笔记2.3 代码 03. 子集 II&#xff08;No. 90&#xff09;3.1 题目3.2 笔记3.3 代码 Day 28 01. 复原 IP 地址&#xff08;No. 9…

RK3399平台开发系列讲解(USB篇)USB 主设备和从设备

&#x1f680;返回专栏总目录 文章目录 一、主设备二、集线器三、功能设备 沉淀、分享、成长&#xff0c;让自己和他人都能有所收获&#xff01;&#x1f604; &#x1f4e2;介绍 USB 主设备和从设备。 一、主设备 检测 USB 设备的插拔动作管理主从通讯之间的控制流管理主从通…

防火墙 iptables(二)--------------SNAT与DNAT

一、SNAT ①SNAT 应用环境: 局域网主机共享单个公网IP地址接入Internet (私有IP不能在Internet中正常路由) ②SNAT原理: 源地址转换&#xff0c;根据指定条件修改数据包的源IP地址&#xff0c;通常被叫做源映射 数据包从内网发送到公网时&#xff0c;SNAT会把数据包的源IP由…

用Python和OpenCV搭建自己的一维码和QRCode扫描仪(步骤 + 源码)

导 读 本文主要介绍使用Python和OpenCV搭建自己的一维码和QRCode扫描仪&#xff08;步骤 源码&#xff09;。 项目简介 本文我们将创建一个程序来扫描图像中的二维码和条形码。对于这个程序&#xff0c;我们需要三个包&#xff0c;分别是OpenCV、NumPy和pyzbar。大多数 Pyth…

Eclipse - Format Comment

Eclipse - Format & Comment 1. Correct Indentation2. Format3. Toggle Comment4. Add Block Comment5. Remove Block CommentReferences 1. Correct Indentation Ctrl A: 选择全部代码 Ctrl I: 校正缩进 or right-click -> Source -> Correct Indentation 2. F…

2024年!PyCharm快捷键大全

收藏&#xff01;PyCharm快捷键大全 工欲善其事必先利其器&#xff0c;PyCharm 是最popular的Python开发工具&#xff0c;它提供的功能非常强大&#xff0c;是构建大型项目的理想工具之一&#xff0c;如果能挖掘出里面实用技巧&#xff0c;能带来事半功倍的效果。 本文主要向大…

实例讲解join方法的使用

Python的join()方法用于将序列中的元素以指定的字符连接生成一个新的字符串 语法 str.join(sequence) 参数 sequence 要连接的元素序列、字符串、元组、字典 返回值 返回通过指定字符连接序列中的元素后生成的新的字符串 实例 str "-"; seq ("a"…