Netty源码系列 之 ChannelPipeline IO处理回顾 源码

目录

ChannelPipeline【包含AbstractUnsafe.write的源码流程,比之前更加深化了,必看】

ChannelPipeline概念回顾

ChannelPipeline的创建

Inbound(输入Handler)所对应的事件传播

Outbound(输出Handler)所对应的事件传播【包含AbstractUnsafe.write的源码流程,比之前更加深化了,必看】


ChannelPipeline【包含AbstractUnsafe.write的源码流程,比之前更加深化了,必看】

在细致剖析ChannelPipeline之前,我们拿pipeline管道add的一个最常见的Handler:帧解码器类LineBasedFrameDecoder,来进行debug源码展示其流程,进而为后续清晰描绘ChannelPipeline做铺垫

所有Handler,无论是是自定义的Handler还是netty原生的Handler,都是通过ChannelPipeline.addLast进行添加的,LineBasedFrameDecoder也不例外。

pipeline.addLast(new LineBasedFrameDecoder(1024));

以如下案例进行debug源码演示其流程

  • 以下测试案例以及源码过程演示的是消息数据出现粘包现象时,帧解码器LineBasedFrameDecoder的处理
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;public class MyNettyClient1 {private static final Logger log = LoggerFactory.getLogger(MyNettyClient1.class);public static void main(String[] args) throws InterruptedException {log.debug("myNettyClientStarter------");EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);Bootstrap group = bootstrap.group(eventLoopGroup);//32 ---> 1 IO操作 31线程bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new StringEncoder());}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();//半包?粘包?  1  0//粘包 --->ByteBuf(1024) ---> socket 65535 --- serverchannel.writeAndFlush("sunshuai\nxiaohei\nxiaojr\n");
//        channel.writeAndFlush("sunshuaixiaohei666\n");}
}
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer1 {private static final Logger log = LoggerFactory.getLogger(MyNettyServer1.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(new NioEventLoopGroup());//接受socket缓冲区大小 等同于 滑动窗口的初始值 65535//serverBootstrap.option(ChannelOption.SO_RCVBUF, 100);//netty创建ByteBuf时 执行大小 默认1024 child ScoketChannel相关
//        serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16));serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Override//protected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//sunshuai\ni love you\n//xxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n//最大长度 指的是 如果超过最大长度,还没有发现分隔符,不处理。pipeline.addLast(new LineBasedFrameDecoder(1024));pipeline.addLast(new LoggingHandler());}});//serverBootstrap.bind(8000);}
}
  • debug源码流程 【之前总结过的源码流程一笔带过】

1.先debug启动服务端,完成服务端初始化操作后,会在NioEventLoop中Selector阻塞等待客户端连接,IO事件的触发

2.以运行方式进行启动客户端

3.服务端停止阻塞,开始处理Accept事件

虽然本次调用的也是read方法,但是触发的是Accept连接事件,客户端连接上来,服务端先处理连接,然后开启一个新NioEventLoop线程去完成客户端的注册和后续该SocketChannel所对应的IO事件的处理。

4.

5.客户端注册register的逻辑和服务端注册的逻辑是一致的

6.初始化工作

最终会回调到initChannel方法,完成自定义Handler的add

7.Accept连接事件处理完后,服务端接下来进行处理当前NioSocketChannel所对应的IO事件

服务端触发read事件,读取客户端发送过来的数据

前面的逻辑之前分析过,这里重点关注fireChannelRead中产生的逻辑:帧解码器Handler类的作用操作

回调帧解码器LineBasedFrameDecoder这一Handler的channelRead方法

ByteToMessageDecoder类:

callDecode方法:

一定注意一点:每一次通过帧解码器类进行解码消息,无论消息多长多短,一次只能解码出一条完整的消息,啥叫完整的消息?每遇见一个分隔符被称之为一条完整的消息数据

解码出一条完整的消息后:

直接看最后一条消息数据的处理

  • 修改一下测试案例,测试一下半包情况下,帧解码器的处理情况
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer1 {private static final Logger log = LoggerFactory.getLogger(MyNettyServer1.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(new NioEventLoopGroup());//接受socket缓冲区大小 等同于 滑动窗口的初始值 65535//serverBootstrap.option(ChannelOption.SO_RCVBUF, 100);//netty创建ByteBuf时 执行大小 默认1024 child ScoketChannel相关serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16));serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Override//protected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//sunshuai\ni love you\n//xxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n//最大长度 指的是 如果超过最大长度,还没有发现分隔符,不处理。pipeline.addLast(new LineBasedFrameDecoder(1024));pipeline.addLast(new LoggingHandler());}});//serverBootstrap.bind(8000);}
}
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;public class MyNettyClient1 {private static final Logger log = LoggerFactory.getLogger(MyNettyClient1.class);public static void main(String[] args) throws InterruptedException {log.debug("myNettyClientStarter------");EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);Bootstrap group = bootstrap.group(eventLoopGroup);//32 ---> 1 IO操作 31线程bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new StringEncoder());}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();//半包?粘包?  1  0//粘包 --->ByteBuf(1024) ---> socket 65535 --- server
//        channel.writeAndFlush("sunshuai\nxiaohei\nxiaojr\n");channel.writeAndFlush("sunshuaixiaohei666\n");}
}

改动之处:

服务端:

客户端:

  • debug测试

以同样的方式启动服务端与客户端

相同的逻辑不再记录:

return null:

由于在第一轮decode后没有找到分隔符,所以out中没有数据

为什么?因为我们设置ByteBuf的最大值 初始值 最小值都为16,当read读取的数据长度一次最多为16,并且在读取到的ByteBuf中,数据没有发现分隔符,所以out输出集合中就没有add。

由于客户端写过来的消息数据还没有read完,所以会再一次让NioEventLoop线程进行处理read这一IO

这一次才找到分隔符,这样才可以返回一条完整的消息数据,并且在out集合中也会add这一条完整的消息数据

ChannelPipeline概念回顾

回顾ChannelPipeline相关的概念:

1.每一个客户端(SocketChannel | 线程)都独立享有一套pipeline管道,这样以空间换安全的方式,避免了多线程共享临界区资源导致并发安全问题。

2.Pipeline管道维护的是一组addLast进来的Handler,Pipeline的结构为双向链表。

3.Pipeline所维护的Handler的类型有哪些?ChannelInbound(输入[读]类型),ChannelOutbound(输出[写]类型) 。注释:head,tail这两个Handler是Pipeline自带的Handler。

4.channel.writeAndFlush() :从tail尾部这一Handler往前找,一直找到第一个ChannelOutBoundHandler 进行写输出处理

ctx.writeAndFlush():从当前Handler往前找,一直找到第一个ChannelOutBoundHandler 进行写输出处理

5.每一个ChannelHandler(无论输入还是输出,只要是存在于Pipeline管道中的),这些Handler都存在于ChannelContext中。ChannelContext提供了ByteBuf的内存分配器,Handler事件传播功能等。

ChannelPipeline的创建

1.创建NioServerSocketChannel或NioSocketChannel时

2.

3.初始化ChannelPipeline管道,Pipeline管道中默认带有head,tail这两个内置的Handler。

  • 除了每一个ChannelPipeline默认自带的Handler:head,tail之外,我还可以手工Pipeline.addLast(xxxHandler)添加自定义的Handler
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;public class MyNettyClient1 {private static final Logger log = LoggerFactory.getLogger(MyNettyClient1.class);public static void main(String[] args) throws InterruptedException {log.debug("myNettyClientStarter------");EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);Bootstrap group = bootstrap.group(eventLoopGroup);//32 ---> 1 IO操作 31线程bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new StringEncoder());}});ByteBuf byteBuf = ByteBufAllocator.DEFAULT.directBuffer(10);Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();//半包?粘包?  1  0//粘包 --->ByteBuf(1024) ---> socket 65535 --- server
//        channel.writeAndFlush("sunshuai\nxiaohei\nxiaojr\n");channel.writeAndFlush("sunshuaixiaohei666sunshuaixiaohei666\n");}
}
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer1 {private static final Logger log = LoggerFactory.getLogger(MyNettyServer1.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(new NioEventLoopGroup());DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();//接受socket缓冲区大小 等同于 滑动窗口的初始值 65535//serverBootstrap.option(ChannelOption.SO_RCVBUF, 100);//netty创建ByteBuf时 执行大小 默认1024 child ScoketChannel相关serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,32,1024));serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Override//protected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//sunshuai\ni love you\n//xxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n//最大长度 指的是 如果超过最大长度,还没有发现分隔符,不处理。pipeline.addLast(defaultEventLoopGroup,"lineBased",new LineBasedFrameDecoder(1024));pipeline.addLast(new LoggingHandler());}});//serverBootstrap.bind(8000);}
}

debug追踪一下源码:【debug方式启动Server服务端,完成bind后再以运行的方式启动Client客户端】

1.pipeline.addLast添加自定义的Handler处理器类

(1)defaultEventLoopGroup:该参数传递的是额外创建的线程组。如果不传递该线程组呢?如果不传递该线程组,那么该Handler是由处理IO事件的线程去处理。但是假设说该自定义的Handler处理的逻辑过长,导致线程资源占用时间过久,那么是不是处理IO的线程资源就紧张了?对吧。所以我们额外补充了一组线程:defaultEventLoopGroup来用作该自定义Handler的逻辑处理。但是一定要显示配置呀。

defaultEventLoopGroup线程组和普通的Thread线程是一样的,只不过使用DefaultEventLoopGroup更加完美的和Netty体系相融合,共用同一份代码。

(2)lineBased:显示指定的Handler的名字。如果不指定,默认情况下Handler的名字为"类名#0"。比如这个Handler的name为:LineBasedFrameDecoder#0

(3)new LineBasedFrameDecoder(xxx):该参数传递的就是要添加到ChannelPipeline的Handler类啦

2.在客户端连接上服务端后,服务端会分配给客户端一个对应的NioSocketChannel,在NioSocketChannel初始化阶段会完成ChannelPipeline的创建和对应Handler的addLast添加【细致过程见之前的总结】

3.

4.checkMultiplicity方法:

当客户端连接数变多后,如果同一个Handler是非@Sharable 且 之前其他线程addLast添加过,那么直接抛出异常。避免了线程并发安全问题。

5.

Handler内部其实封装的有Context,Context真正的去封装一系列的参数:

filterName方法:

generateName方法:

构造名字name的时,啥时候容易会出现名字重复呢?

匿名内部类的情况很容易造成名字重复,因为匿名内部类都是一样的,为ChannelInbound HandlerAdapter,如果添加多个,会造成生成的名字重复,如果造成名字重复怎么办?

下面是解决方案:

如果说检测到name名字重复,假设重复的名字为ABCServer#0,那么重新生成的名字为ABCServer#1,但是会循环判断重新生成的名字是否还是重复的,直到最终context0(newName)==null退出循环为止。

context0方法:

6.addLast0方法:

7.callHandlerAddedInEventLoop(newCtx, executor)方法:

由于该Handler的线程组执行器使用的是自定义创建的DefaultEventLoopGroup,所以可能会之前IO事件处理线程组NioEventLoopGroup处理逻辑代码不太一样。其实DefaultEventLoopGroup更像是一种简化,因为Handler的逻辑处理视作是一种普通任务task的run执行,所以只把这部分的逻辑从NioEventLoop中抽离出来即可。

关键是这一个task的处理:

这不就和之前的逻辑重合到一起啦:

Inbound(输入Handler)所对应的事件传播

Inbound事件都对应有哪些?

其实说是事件,实际上各个事件方法也就是Handler的生命周期回调方法。当Handler执行到生命周期的某个步骤后,Handler就可以回调执行某一个事件方法。比如说:当创建完Channel管道后(完成生命周期中创建管道的阶段),那么就会回调ChannelActive这一方法。当Channel有输入流入时(完成生命周期中读入数据的阶段),就会回调ChannelRead这一方法。

图中所有的Handler:head,h1,h2,tail 都拥有输入Handler所对应的Inboud事件方法

head和tail所对应的Handler(Context)继承Inbound和Outbound,所以这俩具有输入,输出所对应的所有事件方法。

Inbound输入事件是通过什么api在多个Handler之间进行传播的?

ChannelPipeline通过一个双向链表的数据结构把多个Handler进行链接维护起来,但是msg数据,事件是如何在多个Handler之间传递的呢?是通过两类的API:1.ctx.fireChannelxxx(); 2.super.fireChannelxxx()

如果要传递channelActive事件,那么调用的方法就是:

1.ctx.fireChannelActive(); 2.super.fireChannelActive()

以channelRead为例:

channelRead事件的源头是什么?

NioByteUnsafe类的read方法--->pipeline.fireChannelRead()

debug流程如下:

1.

2.第一个Handler是HeadContext

3.

4.

5.

6.

从Head头handler开始往尾部方向去找,找到第一个发现的输入Inbound-Handler

本次找到的下一个输入handler为ServerBootstrapAcceptor#0

7.执行ServerBootstrapAcceptor#0这一输入handler,同理前面的流程即可。

8.

9.

注册完后,就会开始真正的读数据通信。过程很复杂,之前总结过,这里不再总结。参考之前的总结笔记去debug吧。

读数据通信时,首先是到HeadHandler#0,然后会把数据传递给LineBaseFrameDecoder这一封帧解码器去解码数据,并且解决半包粘包的问题。

下面简单记录一下:

首先会到HeadHandler#0:

接着会传递给LineBaseFrameDecoder所对应的Handler:

找到LineBaseFrameDecoder所对应的Handler去执行

【又回到解码器的流程啦,熟悉吧,之前总结过】

这个过程:在建立C-S连接,完成NioSocketChannel管道的注册后,开始read读通信操作,首先输入的数据会经过HeadHandler#0,其次读入的数据会传递给LineBasedFrameDecoder这一我们配置的解码Handler,无论是哪一个Handler都是回调其Handler重写的channelRead方法。在LineBasedFrameDecoder对应的Handler中,我们完成数据的封帧操作,解决半包粘包问题,并且完成解码操作。

封帧完毕后,把每一条完整的消息数据再一次通过fireChannelRead传递给下一个Handler:

会找到LoggingHandler#0完成对完整消息数据的控制台输出:

控制台输出:

onUnhandledInboundMessage方法:主要是完成ByteBuf资源的释放和循环再利用。如果不及时释放,那么内存溢出不是梦哈。

以下是释放ByteBuf内存的底层方法:

等所有Handler执行完毕后,NioEventLoop会再一次陷入阻塞等待:

Outbound(输出Handler)所对应的事件传播【包含AbstractUnsafe.write的源码流程,比之前更加深化了,必看】

Outbound事件都对应有哪些?

Outbound事件的传播的源头:

1.channel.writeAndFlush():

从tail这一Handler从后往前去找,直到找到第一个出现的Outbound-Handler

2.ctx.writeAndFlush():

从当前触发该操作的Handler往前寻找,直到找到第一个出现的Outbound-Handler

  • 测试案例
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer1 {private static final Logger log = LoggerFactory.getLogger(MyNettyServer1.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(new NioEventLoopGroup());DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();//接受socket缓冲区大小 等同于 滑动窗口的初始值 65535//serverBootstrap.option(ChannelOption.SO_RCVBUF, 100);//netty创建ByteBuf时 执行大小 默认1024 child ScoketChannel相关serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,32,1024));serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Override//protected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//sunshuai\ni love you\n//xxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n//最大长度 指的是 如果超过最大长度,还没有发现分隔符,不处理。pipeline.addLast(defaultEventLoopGroup,"lineBased",new LineBasedFrameDecoder(1024));pipeline.addLast(new LoggingHandler());}});//serverBootstrap.bind(8000);}
}
package com.messi.netty_source_03.Test06;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;public class MyNettyClient1 {private static final Logger log = LoggerFactory.getLogger(MyNettyClient1.class);public static void main(String[] args) throws InterruptedException {log.debug("myNettyClientStarter------");EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);Bootstrap group = bootstrap.group(eventLoopGroup);//32 ---> 1 IO操作 31线程bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {if(ctx.channel().isWritable()) {ctx.writeAndFlush("sunshuaixiaohei666sunshuaixiaohei666\n");}}});}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();//半包?粘包?  1  0//粘包 --->ByteBuf(1024) ---> socket 65535 --- server
//        channel.writeAndFlush("sunshuai\nxiaohei\nxiaojr\n");
//        channel.writeAndFlush("sunshuaixiaohei666sunshuaixiaohei666\n");}
}
  • 测试

1.

2.省略很多步骤

3.从write写的源头开始debug追踪

4.

5.

findContextOutbound方法:

从当前触发该操作的Handler往前寻找,直到找到第一个出现的Outbound-Handler。当前找到的handler为StringEncoder,该handler是对写出的数据进行编码操作,编码转换成ByteBuf格式。实际上编码这个过程就是把消息数据写到应用层缓冲区ByteBuf(实际上封装的是ByteBuffer,最终底层还是写到ByteBuffer中)

6.

7.

这一过程称之为编码Encoder:

8.继续向后依次唤醒pipeline双向链表的每一个Handler

9.唤醒LoggingHandler#0

10.

这一次向前找到HeadContext(Handler)

11.

12.当HeadContext执行的write操作,才是真正的unsafe.write,直接把ByteBuf中存储封装的消息数据写出到OutboundBuffer,OutboundBuffer是由一个双向链表结构,链表的每一个元素为Entry类型,msg消息数据封装到该Entry中,Entry还会封装一些其他的元数据信息。此时数据状态为unflush

filterOutboundMessage方法:

Entry.newInstance方法:

Entry是ChannelOutboundBuffer的一个内部类,为什么设置成一个内部类?因为Entry只在该类中使用,所以创建成一个内部类。Entry中封装的有ByteBuf类型的msg消息数据。还有一系列关于消息数据的元数据信息

incrementPendingOutboundBytes方法:

如果累计写出的消息数据大小超过了高水位线,那么设置为Unwritable不可写状态。

12.flush操作:

HeadContext的flush操作才是真正的把应用层缓冲区ChannelOutboundBuffer的数据写出到socket-send缓冲区。

以下过程同理write,都是会一个个迭代遍历所有Handler

直到最终找到Head-Handler,HeadHandler(Context)会把应用层缓冲区的数据真正的写到socket-send缓冲区

LoggingHandler处理完flush操作后,会继续往前传递寻找下一个写出-Handler执行flush操作:

找到HeadContext#0:

真正的AbstractUnsafe.flush:

flush0方法:


doWrite方法:

((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite():

获取到socket-send缓冲区的大小

in.nioBuffers(1024, maxBytesPerGatheringWrite):

通过ChannelOutboundBuffer(in),把封装在该buffer中的所有Entry转换一个个对应的ByteBuffer。但是最大不超过1024个。为什么要把一个个转换成一个个对应的ByteBuffer?因为为了便于操作,如果把所有的Entry都放在一个ByteBuffer中,也可以,但是需要操作读写指针的难度将会大幅度提升。

nioBuffers方法中使用到的internalNioBuffer方法:

这个方法就是提取出ByteBuf中的ByteBuffer对象

incompleteWrite(true):

当localWrittenBytes <= 0时,说明没有写出数据到socket-send缓冲区,此时需要一个兜底操作:让SelectionKey监控WRITE写事件,使得下一次写操作时可以及时监控到。

adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite)

实时的调整socket-send内核缓冲区大小

in.removeBytes(localWrittenBytes):

把已经发出去的数据从ChannelOutboundBuffer中更新删除

int writeSpinCount = config().getWriteSpinCount():

无论是读read还是write写,都会有一个次数限制,意思就是,占用线程资源的read或write操作最多连续执行16次。如果16次还没有完成读或写的任务时,线程就不会再被IO所占用,而是会切换到执行非IO的task,这是为了防止非IO-task被阻塞时间过长,可能非IO-task相对很快就执行完毕了,所以netty做了一个这样的设计。但是话说回来,16次循环IO,如果缓冲区设置的够大,可以实现16GB的IO转换,一般都是可以IO传输完毕的。

final int localWrittenBytes = ch.write(buffer):

拿到转换好的一个个的ByteBuffer数据,通过SocketChannel管道写出到socket-send内核缓冲区

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/461277.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flink CDC 与 Kafka 集成:Snapshot 还是 Changelog?Upsert Kafka 还是 Kafka?

博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维…

通过Demo学WPF—数据绑定(二)

准备 今天学习的Demo是Data Binding中的Linq&#xff1a; 创建一个空白解决方案&#xff0c;然后添加现有项目&#xff0c;选择Linq&#xff0c;解决方案如下所示&#xff1a; 查看这个Demo的效果&#xff1a; 开始学习这个Demo xaml部分 查看MainWindow.xaml&#xff1a; …

HiveSQL——条件判断语句嵌套windows子句的应用

注&#xff1a;参考文章&#xff1a; SQL条件判断语句嵌套window子句的应用【易错点】--HiveSql面试题25_sql剁成嵌套判断-CSDN博客文章浏览阅读920次&#xff0c;点赞4次&#xff0c;收藏4次。0 需求分析需求&#xff1a;表如下user_idgood_namegoods_typerk1hadoop1011hive1…

如何在vue中使用拖动排序组件sortablejs

效果图&#xff1a; 1.首先&#xff0c;我们需要在vue项目中安装依赖&#xff1a; npm install -save sortablejs2.创建demo文件>demoTest.vue&#xff0c;直接附上实例代码&#xff1a; <template><div><div id"table-names"><div class&…

RibbonOpenFeign源码(待完善)

Ribbon流程图 OpenFeign流程图

自然语言学习nlp 六

https://www.bilibili.com/video/BV1UG411p7zv?p118 Delta Tuning&#xff0c;尤其是在自然语言处理&#xff08;NLP&#xff09;和机器学习领域中&#xff0c;通常指的是对预训练模型进行微调的一种策略。这种策略不是直接更新整个预训练模型的权重&#xff0c;而是仅针对模型…

博主:今日无更

嘴馋吃坏东西了&#xff0c;今日断更一天 一会就删 发两次通知假装完成了任务&#xff08;bushi&#xff09;

win32编程系统BUG(Win32 API中的WM_SETTEXT消息)

由于频繁使用Win32 API中的WM_SETTEXT消息&#xff0c;导致内存占用直线上升。 暂未找到有效解决方案。

C#用Array类的Reverse方法反转数组中元素

目录 一、Array.Reverse 方法 1.重载 2.Reverse(Array, Int32, Int32) 3. Reverse(Array) 4.Reverse(T[]) 5. Reverse(T[], Int32, Int32) 二、实例 1.Array.Reverse 方法4种重载方法综合实例 2.Reverse(Array)方法的实例 一、Array.Reverse 方法 反转一维 Array 或部…

pythn-scipy 查漏补缺

1. 2. 3. 4. 5. 6. 7. 8. 9. 偏度 skewness&#xff0c;峰度 kurtosis

在本地运行大型语言模型 (LLM) 的六种方法(2024 年 1 月)

一、说明 &#xff08;开放&#xff09;本地大型语言模型&#xff08;LLM&#xff09;&#xff0c;特别是在 Meta 发布LLaMA和后Llama 2&#xff0c;变得越来越好&#xff0c;并且被越来越广泛地采用。 在本文中&#xff0c;我想演示在本地&#xff08;即在您的计算机上&#x…

教师题不会怎么搜答案?用这三款神器就够了!!! #笔记#知识分享#职场发展

在大学生的学习过程中&#xff0c;遇到难题和疑惑是常有的事情。然而&#xff0c;随着互联网的普及和技术的发展&#xff0c;搜题和学习软件成为了大学生们解决问题的利器。今天&#xff0c;我将向大家推荐几款备受大学生喜爱的搜题和学习软件&#xff0c;帮助我们更好地应对学…