文章目录
- Pre
- Netty Client Code
- Netty 客户端创建流程
- 源码分析
- 入口
- 客户端建立连接
- NioMessageUnsafe#read 处理 OP_ACCEPT
- 客户端发送数据
- NioByteUnsafe#read 处理 OP_READ
- 源码图
Pre
Netty Review - ServerBootstrap源码解析
Netty Review - NioServerSocketChannel源码分析
Netty Review - 服务端channel注册流程源码解析
Netty Client Code
Netty客户端的创建流程通常涉及以下步骤:
-
创建Bootstrap实例:使用Bootstrap类创建一个Netty客户端引导程序实例。Bootstrap负责配置和启动Netty客户端。
-
设置EventLoopGroup:为客户端引导程序指定一个EventLoopGroup。EventLoopGroup是一组处理I/O操作的线程池,通常包含一个用于处理连接的boss线程池和一个用于处理I/O事件的worker线程池。
-
指定Channel类型:通过指定Channel的实现类或提供一个Channel工厂来指定客户端将要使用的Channel类型。不同的Channel类型对应着不同的传输协议,如NIO、Epoll、KQueue等。
-
配置Channel选项:通过调用Bootstrap的option()方法来配置客户端Channel的选项,如TCP连接的参数、Socket参数等。
-
设置Channel处理器:调用Bootstrap的handler()方法设置ChannelPipeline中的ChannelHandler。ChannelHandler用于处理入站和出站事件,比如编解码、数据处理、日志等。
-
连接到服务器:调用Bootstrap的connect()方法连接到服务器。此时,客户端会尝试连接到指定的远程服务器,并返回一个ChannelFuture对象,用于异步等待连接的建立。
-
处理连接结果:通过ChannelFuture对象的addListener()方法添加一个监听器,监听连接操作的结果。一旦连接建立成功或失败,将执行相应的操作。
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class MyClient {public static void main(String[] args) throws Exception{EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new MyMessageEncoder());pipeline.addLast(new MyClientHandler());}});System.out.println("netty client start。。");ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();channelFuture.channel().closeFuture().sync();}finally {group.shutdownGracefully();}}
}
在这个示例中,我们创建了一个NIO的EventLoopGroup,使用NioSocketChannel作为客户端的Channel类型,设置了TCP连接的保持活动选项,并初始化ChannelPipeline。最后,通过connect()方法连接到远程服务器,并启动客户端。
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;public class MyClientHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {for(int i = 0; i< 2; i++) {String msg = "你好,我是artisan";//创建协议包对象MyMessageProtocol messageProtocol = new MyMessageProtocol();messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));ctx.writeAndFlush(messageProtocol);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}
Netty 客户端创建流程
源码分析
入口
客户端建立连接
当客户端连接时,服务器端会监听到一个 OP_ACCEPT
事件。这是由于服务器端的 NIO 通道(通常是 ServerSocketChannel
)在接受客户端连接时,会触发 OP_ACCEPT
事件。这个事件通知服务器端,有一个新的连接已经准备好接受。
在 Netty 中,当服务器端监听到 OP_ACCEPT
事件时,会执行相应的处理逻辑。通常情况下,服务器端会执行以下步骤:
- 获取到服务器端的
Selector
对象。 - 通过
Selector
获取到发生事件的SelectionKey
。 - 从
SelectionKey
中获取到对应的通道(ServerSocketChannel
)。 - 调用
ServerSocketChannel
的accept()
方法,接受客户端的连接,返回一个新的SocketChannel
对象,表示与客户端建立的连接。 - 将新建立的
SocketChannel
注册到Selector
上,并注册OP_READ
事件,以便读取客户端发送的数据。 - 处理客户端连接成功的逻辑,如记录日志、发送欢迎消息等。
这样,服务器端就能够接受客户端的连接,并与之建立通信。
NioMessageUnsafe#read 处理 OP_ACCEPT
这段代码是 Netty 中用于处理读取数据的方法。以下是对代码的中文注释:
@Override
public void read() {assert eventLoop().inEventLoop(); // 断言当前线程处于事件循环中// 获取通道配置和管道final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();// 获取用于分配接收字节缓冲区的句柄final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config); // 重置句柄状态boolean closed = false; // 标志是否已关闭连接Throwable exception = null; // 异常信息try {try {do {int localRead = doReadMessages(readBuf); // 读取消息到缓冲区if (localRead == 0) { // 如果未读取到任何数据break;}if (localRead < 0) { // 如果读取到的数据小于0,表示连接已关闭closed = true;break;}allocHandle.incMessagesRead(localRead); // 增加读取的消息数量} while (allocHandle.continueReading()); // 循环读取数据,直到不再需要读取或者没有更多的数据} catch (Throwable t) {exception = t; // 记录异常信息}int size = readBuf.size(); // 获取读取到的数据数量for (int i = 0; i < size; i++) { // 遍历读取到的数据readPending = false; // 标记为未挂起状态pipeline.fireChannelRead(readBuf.get(i)); // 将读取到的数据传递给管道的下一个处理器}readBuf.clear(); // 清空读取缓冲区allocHandle.readComplete(); // 表示读取操作完成pipeline.fireChannelReadComplete(); // 通知管道读取操作已完成if (exception != null) { // 如果有异常发生closed = closeOnReadError(exception); // 根据异常情况判断是否需要关闭连接pipeline.fireExceptionCaught(exception); // 将异常信息传递给管道的异常处理器}if (closed) { // 如果连接已关闭inputShutdown = true; // 标记为输入流已关闭if (isOpen()) { // 如果通道仍然打开close(voidPromise()); // 关闭通道}}} finally {// 检查是否有未处理的读取挂起,可能的原因是:// * 用户在 channelRead(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()// * 用户在 channelReadComplete(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()// 详情参考:https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) { // 如果没有挂起的读取,并且未开启自动读取removeReadOp(); // 移除读取操作}}
}
这段代码负责从通道中读取数据,并将读取到的数据传递给管道中的下一个处理器。在读取数据的过程中,会处理可能发生的异常,并根据需要关闭连接。同时,还会处理是否需要继续读取数据,以及是否需要移除读取操作。
doReadMessages(readBuf)
io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
@Override
protected int doReadMessages(List<Object> buf) throws Exception {// 从底层 SocketChannel 接受新连接SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) { // 如果成功接受到新连接// 创建一个新的 NioSocketChannel 实例,并添加到List中buf.add(new NioSocketChannel(this, ch));return 1; // 返回成功接受到一个连接}} catch (Throwable t) {// 处理异常logger.warn("Failed to create a new channel from an accepted socket.", t);try {// 尝试关闭 SocketChannelch.close();} catch (Throwable t2) {// 处理关闭异常logger.warn("Failed to close a socket.", t2);}}return 0; // 返回没有接受到新连接
}
new NioSocketChannel(this, ch)
public NioSocketChannel(Channel parent, SocketChannel socket) {super(parent, socket);config = new NioSocketChannelConfig(this, socket.socket());}
接下来这段代码是 AbstractNioByteChannel
的构造函数,它调用了父类 AbstractNioChannel
的构造函数,并指定了感兴趣的事件为 SelectionKey.OP_READ
,表示该通道对读取事件感兴趣。让我们逐行解释:
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {// 调用父类 AbstractNioChannel 的构造函数super(parent, ch, SelectionKey.OP_READ);
}
这个构造函数的作用是初始化 AbstractNioByteChannel
,它接受一个 Channel
父类和一个 java.nio.channels.SelectableChannel
对象作为参数。在构造函数中,通过调用父类的构造函数,将 SelectableChannel
注册到了父类的 selector
中,并指定了对读取事件感兴趣。
剩下的逻辑如下
pipeline.fireChannelRead(readBuf.get(i))
客户端发送数据
当客户端向服务器端发送数据时,会触发 OP_READ
事件。这是由于服务器端的 NIO 通道在接收到客户端发送的数据时,会触发 OP_READ
事件。这个事件通知服务器端,有数据可读取。
在 Netty 中,当服务器端监听到 OP_READ
事件时,会执行相应的处理逻辑。通常情况下,服务器端会执行以下步骤:
- 获取到服务器端的
Selector
对象。 - 通过
Selector
获取到发生事件的SelectionKey
。 - 从
SelectionKey
中获取到对应的通道(SocketChannel
)。 - 从
SocketChannel
中读取数据,并将数据存储到缓冲区中。 - 处理从客户端接收到的数据,执行相应的业务逻辑,如解析请求、处理消息等。
- 如有必要,向客户端发送响应消息。
这样,服务器端就能够接收客户端发送的数据,并根据业务逻辑进行处理。
NioByteUnsafe#read 处理 OP_READ
这段代码是 Netty 中用于读取数据的方法。让我们逐行解释:
@Override
public final void read() {final ChannelConfig config = config(); // 获取通道配置if (shouldBreakReadReady(config)) { // 检查是否应该中断读取就绪状态clearReadPending(); // 清除读取挂起标志return;}final ChannelPipeline pipeline = pipeline(); // 获取通道管道final ByteBufAllocator allocator = config.getAllocator(); // 获取字节缓冲区分配器final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); // 获取接收缓冲区分配器的句柄allocHandle.reset(config); // 重置分配器ByteBuf byteBuf = null; // 字节缓冲区对象boolean close = false; // 是否关闭通道try {do {byteBuf = allocHandle.allocate(allocator); // 分配字节缓冲区allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 读取字节到缓冲区if (allocHandle.lastBytesRead() <= 0) { // 检查是否有字节被读取// 如果没有读取到任何字节,释放缓冲区byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0; // 检查是否需要关闭通道if (close) { // 如果需要关闭通道// 因为收到了 EOF,所以没有剩余可读取的数据。readPending = false; // 标记读取完成}break; // 跳出循环}allocHandle.incMessagesRead(1); // 增加已读消息数readPending = false; // 标记读取完成pipeline.fireChannelRead(byteBuf); // 将读取到的字节缓冲区传递给通道的处理器链byteBuf = null;} while (allocHandle.continueReading()); // 继续读取直到满足条件allocHandle.readComplete(); // 读取完成pipeline.fireChannelReadComplete(); // 通知通道读取完成if (close) { // 如果需要关闭通道closeOnRead(pipeline); // 关闭通道}} catch (Throwable t) {// 处理读取过程中的异常handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {// 检查是否有未处理的读取挂起if (!readPending && !config.isAutoRead()) {removeReadOp(); // 移除读取操作}}
}
从通道中读取数据,并将读取到的数据传递给通道的处理器链进行处理。在读取过程中可能会出现异常,需要进行相应的处理。最后,根据读取的结果来判断是否需要关闭通道。
里面的主要逻辑如下
源码图
图都给你画好了,戳这里