一、Channel
、ChannelPipeline
、ChannelHandler
、ChannelHandlerContext
之间的关系
在编写Netty
程序时,经常跟我们打交道的是上面这几个对象,这也是Netty
中几个重要的对象,下面我们来看看它们之间有什么样的关系。
Netty
中的Channel
是框架自己定义的一个通道接口,Netty
实现的客户端NIO
套接字通道是NioSocketChannel
提供的服务器端NIO
套接字通道是NioServerSocketChannel
。
1.1 ChannelPipeline
当服务端和客户端建立一个新的连接时, 一个新的Channel
将被创建,同时它会被自动地分配到它专属的ChannelPipeline
。
ChannelPipeline
是一个拦截流经Channel
的入站和出站事件的ChannelHandler
实例链,并定义了用于在该链上传播入站和出站事件流的API
。那么就很容易看出这些ChannelHandler
之间的交互是组成一个应用程序数据和事件处理逻辑的核心。
上图描述了IO
事件如何被一个ChannelPipeline
的ChannelHandler
处理的。
1.2 ChannelHandler
ChannelHandler
分为ChannelInBoundHandler
和ChannelOutboundHandler
两种,如果一个入站 IO 事件被触发,这个事件会从第一个开始依次通过ChannelPipeline
中的ChannelInBoundHandler
,先添加的先执行。
若是一个出站I/O
事件,则会从最后一个开始依次通过ChannelPipeline
中的ChannelOutboundHandler
,后添加的先执行,然后通过调用在ChannelHandlerContext
中定义的事件传播方法传递给最近的ChannelHandler
。
在ChannelPipeline
传播事件时,它会测试ChannelPipeline
中的下一个ChannelHandler
的类型是否和事件的运动方向相匹配。
如果某个ChannelHandler
不能处理则会跳过,并将事件传递到下一个ChannelHandler
,直到它找到和该事件所期望的方向相匹配的为止。
ChannelHandler
可以通过添加、删除或者替换其他的ChannelHandler
来实时地修改ChannelPipeline
的布局。
(它也可以将它自己从ChannelPipeline
中移除。)这是ChannelHandler
最重要的能力之一。
1.3 ChannelHandlerContext
ChannelHandlerContext
代表了ChannelHandler
和ChannelPipeline
之间的关联,每当有ChannelHandler
添加到ChannelPipeline
中时,都会创建ChannelHandlerContext
。
ChannelHandlerContext
的主要功能是管理它所关联的ChannelHandler
和在同一个ChannelPipeline
中的其他ChannelHandler
之间的交互。事件从一个ChannelHandler
到下一个ChannelHandler
的移动是由ChannelHandlerContext
上的调用完成的。
二、Netty
线程模型
在示例中我们程序一开始都会生成两个NioEventLoopGroup
的实例,为什么需要这两个实例呢?这两个实例可以说是Netty
程序的源头,其背后是由Netty
线程模型决定的。
Netty
线程模型是典型的Reactor
模型结构,其中常用的Reactor
线程模型有三种,分别为:Reactor
单线程模型、Reactor
多线程模型和主从Reactor
多线程模型。
而在Netty
的线程模型并非固定不变,通过在启动辅助类中创建不同的EventLoopGroup
实例并通过适当的参数配置,就可以支持上述三种Reactor
线程模型。
2.1 Reactor
单线程模型
Reactor
单线程模型指的是所有的IO
操作都在同一个NIO
线程上面完成。作为NIO
服务端接收客户端的TCP
连接,作为NIO
客户端向服务端发起TCP
连接,读取通信对端的请求或向通信对端发送消息请求或者应答消息。
由于Reactor
模式使用的是异步非阻塞IO
,所有的IO
操作都不会导致阻塞,理论上一个线程可以独立处理所有IO
相关的操作。
Netty
使用单线程模型的的方式如下:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup).channel(NioServerSocketChannel.class)
...
在实例化NioEventLoopGroup
时,构造器参数是1,表示NioEventLoopGroup
的线程池大小是1。
然后接着我们调用b.group(bossGroup)
设置了服务器端的EventLoopGroup
,因此bossGroup
和workerGroup
就是同一个NioEventLoopGroup
了。
bossGroup
用于接收客户端传过来的请求,接收到请求后将后续操作交由workerGroup
处理。
2.2 Reactor
多线程模型
对于一些小容量应用场景,可以使用单线程模型,但是对于高负载、大并发的应用却不合适,需要对该模型进行改进,演进为Reactor
多线程模型。
Rector
多线程模型与单线程模型最大的区别就是有一组NIO
线程处理IO
操作;
-
在该模型中有专门一个
NIO
线程 -Acceptor
线程用于监听服务端,接收客户端的TCP
连接请求;而 1 个NIO
线程可以同时处理N
条链路,但是1个链路只对应1个NIO
线程,防止发生并发操作问题。 -
网络
IO
操作-读、写等由一个NIO
线程池负责,线程池可以采用标准的JDK
线程池实现,它包含一个任务队列和N
个可用的线程,由这些NIO
线程负责消息的读取、解码、编码和发送。
Netty
中实现多线程模型的方式如下:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)...
bossGroup
中只有一个线程,而workerGroup
中的线程是CPU
核心数乘以 2,那么就对应Recator
的多线程模型。
2.3 主从Reactor
多线程模型
在并发极高的情况单独一个Acceptor
线程可能会存在性能不足问题,为了解决性能问题,产生主从Reactor
多线程模型。
主从Reactor
线程模型的特点是:服务端用于接收客户端连接的不再是1个单独的NIO
线程,而是一个独立的NIO
线程池。
Acceptor
接收到客户端TCP
连接请求处理完成后,将新创建的SocketChannel
注册到IO
线程池(sub reactor
线程池)的某个IO
线程上,由它负责SocketChannel
的读写和编解码工作。
Acceptor
线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端 subReactor
线程池的IO
线程上,由IO
线程负责后续的IO
操作。
根据前面所讲的两个线程模型,很容想到Netty
实现多线程的方式如下:
EventLoopGroup bossGroup = new NioEventLoopGroup(4);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)...
但是,在Netty
的服务器端的acceptor
阶段,没有使用到多线程, 因此上面的主从多线程模型在Netty
的实现是有误的。
服务器端的ServerSocketChannel
只绑定到了bossGroup
中的一个线程,因此在调用Java NIO
的 Selector.select
处理客户端的连接请求时,实际上是在一个线程中的,所以对只有一个服务的应用来说,bossGroup
设置多个线程是没有什么作用的,反而还会造成资源浪费。
至于Netty
中的bossGroup
为什么使用线程池,我在stackoverflow
找到一个对于此问题的讨论 。
the creator of Netty says multiple boss threads are useful if we share NioEventLoopGroup between different server bootstraps
2.4 EventLoopGroup
和 EventLoop
当系统在运行过程中,如果频繁的进行线程上下文切换,会带来额外的性能损耗。多线程并发执行某个业务流程,业务开发者还需要时刻对线程安全保持警惕,哪些数据可能会被并发修改,如何保护?这不仅降低了开发效率,也会带来额外的性能损耗。
为了解决上述问题,Netty
采用了串行化设计理念:
- 从消息的读取、编码以及后续
ChannelHandler
的执行,始终都由IO
线程EventLoop
负责,这就意外着整个流程不会进行线程上下文的切换,数据也不会面临被并发修改的风险; EventLoopGroup
是一组EventLoop
的抽象,一个EventLoopGroup
当中会包含一个或多个EventLoop
,EventLoopGroup
提供next
接口,可以从一组EventLoop
里面按照一定规则获取其中一个EventLoop
来处理任务。
在 Netty
服务器端编程中我们需要 BossEventLoopGroup
和 WorkerEventLoopGroup
两个 EventLoopGroup
来进行工作。
BossEventLoopGroup
通常是一个单线程的 EventLoop
,EventLoop
维护着一个注册了 ServerSocketChannel
的 Selector
实例,EventLoop
的实现涵盖IO
事件的分离,和分发(Dispatcher
),EventLoop
的实现充当 Reactor
模式中的分发(Dispatcher
)的角色。
所以通常可以将 BossEventLoopGroup
的线程数参数为 1。
BossEventLoop
只负责处理连接,故开销非常小,连接到来,马上按照策略将 SocketChannel
转发给 WorkerEventLoopGroup
,WorkerEventLoopGroup
会由 next
选择其中一个 EventLoop
来将这 个SocketChannel
注册到其维护的 Selector
并对其后续的 IO
事件进行处理。
ChannelPipeline
中的每一个 ChannelHandler
都是通过它的 EventLoop
(I/O
线程)来处理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的 I/O
处理产生严重的负面影响。但有时可能需要与那些使用阻塞 API
的遗留代码进行交互。
对于这种情况, ChannelPipeline
有一些接受一个 EventExecutorGroup
的 add()
方法。如果一个事件被传递给一个自定义的 EventExecutorGroup
, DefaultEventExecutorGroup
的默认实现。
就是在把 ChannelHanders
添加到 ChannelPipeline
的时候,指定一个 EventExecutorGroup
,ChannelHandler
中所有的方法都将会在这个指定的 EventExecutorGroup
中运行。
static final EventExecutor group = new DefaultEventExecutorGroup(16);
...
ChannelPipeline p = ch.pipeline();
pipeline.addLast(group, "handler", new MyChannelHandler());
2.5 总结
NioEventLoopGroup
实际上就是个线程池,一个EventLoopGroup
包含一个或者多个EventLoop
;- 一个
EventLoop
在它的生命周期内只和一个Thread
绑定; - 所有
EnventLoop
处理的I/O
事件都将在它专有的Thread
上被处理; - 一个
Channel
在它的生命周期内只注册于一个EventLoop
; - 每一个
EventLoop
负责处理一个或多个Channel
;
三、MQTT
协议介绍
MQTT
协议主要由三部分组成:
- 固定头(
MqttFixedHeader
):所有的MQTT
数据包都有,用于表示数据包类型及对应标识,还有数据包的大小; - 可变头(
variableHeader
):部分的MQTT
数据包中有,需要根据协议中具体类型来决定; - 消息体(
payload
):部分的MQTT
数据包中有,具体数据信息(关键真正业务用到的数据哦);
接下来我们以MQTT 3.1
版本为例进行分析介绍,并使用MQTTX
客户端进行测试。
3.1 连接服务器(CONNECT
)和确认连接请求(CONNACK
)
3.1.1 CONNECT
客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是CONNECT
报文;
MqttConnectMessage[
fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=48], variableHeader=MqttConnectVariableHeader[name=MQIsdp, version=3, hasUserName=true, hasPassword=true, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=60],
payload=MqttConnectPayload[clientIdentifier=mqttx_d2adc46c, willTopic=null, willMessage=null, userName=ef_user, password=[112, 119, 100, 64, 50, 52, 54, 56, 48]]]
从CONNECT
报文中,我们可以看到很多的信息,协议标识,协议级别,会话,遗嘱,用户,密码等,我这里抓取的报文只是一个基础参考。
3.1.2 CONNACK
服务端发送CONNACK
报文响应从客户端收到的CONNECT
报文,服务端发送给客户端的第一个报文必须是CONNACK
,这里也只是一个参考,具体需要根据CONNECT
来返回报文;
MqttConnAckMessage[
fixedHeader=MqttFixedHeader[messageType=CONNACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], variableHeader=MqttConnAckVariableHeader[connectReturnCode=CONNECTION_ACCEPTED, sessionPresent=true],
payload=]
3.2 订阅主题(SUBSCRIBE
)和确认订阅(SUBACK
)
3.2.1 SUBSCRIBE
客户端向服务端发送SUBSCRIBE
报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题,为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH
报文给客户端,SUBSCRIBE
报文也(为每个订阅)指定了最大的QoS
等级,服务端根据这个发送应用消息给客户端,具体的应用,DEMO
里只是体现了订阅主题的过程,实际业务并不是如此简单,订阅主题的报文参考如下;
MqttSubscribeMessage[
fixedHeader=MqttFixedHeader[messageType=SUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=16], variableHeader=MqttMessageIdVariableHeader[messageId=17290], payload=MqttSubscribePayload[MqttTopicSubscription[topicFilter=testtopic/#, qualityOfService=AT_MOST_ONCE]
]]
3.2.2 SUBACK
服务端发送SUBACK
报文给客户端,用于确认它已收到并且正在处理SUBSCRIBE
报文,SUBACK
报文包含一个返回码清单,它们指定了SUBSCRIBE
请求的每个订阅被授予的最大QoS
等级,确认订阅的报文参考如下
MqttSubAckMessage[
fixedHeader=MqttFixedHeader[messageType=SUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=3], variableHeader=MqttMessageIdVariableHeader[messageId=25210], payload=MqttSubAckPayload[grantedQoSLevels=[0]]]
3.3 取消订阅(UNSUBSCRIBE
)和取消订阅确认(UNSUBACK
)
3.3.1 UNSUBSCRIBE
客户端发送UNSUBSCRIBE
报文给服务端,用于取消订阅主题,参考报文如下
MqttUnsubscribeMessage[
fixedHeader=MqttFixedHeader[messageType=UNSUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=15], variableHeader=MqttMessageIdVariableHeader[messageId=16150],
payload=MqttUnsubscribePayload[topicName = testtopic/#]
]
3.3.2 UNSUBSCRIBE
服务端发送UNSUBACK
报文给客户端用于确认收到UNSUBSCRIBE
报文,参考报文如下
MqttUnsubAckMessage[
fixedHeader=MqttFixedHeader[messageType=UNSUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], variableHeader=MqttMessageIdVariableHeader[messageId=16150],
payload=
]
3.4.心跳请求(PINGREQ
)和心跳响应(PINGRESP
)
3.4.1 PINGREQ
客户端发送PINGREQ
报文给服务端的,在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着,请求服务端发送响应确认它还活着,使用网络以确认网络连接没有断开,参考报文如下;
MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PINGREQ, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0],
variableHeader=,
payload=
]
3.4.2 PINGRESP
服务端发送PINGRESP
报文响应客户端的PINGREQ
报文,表示服务端还活着;
MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PINGRESP, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0],
variableHeader=,
payload=
]
3.5.断开连接(DISCONNECT
)客户端主动断开连接
DISCONNECT
报文是客户端发给服务端的最后一个控制报文,表示客户端正常断开连接,而服务端不需要返回消息了,处理业务逻辑便可。
MqttMessage[
fixedHeader=MqttFixedHeader[messageType=DISCONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0],
variableHeader=,
payload=
]
3.6 发布和订阅,根据(QoS
等级)
常用MQ
中不保证消息在整体链路的中质量,MQTT
协议对发布的消息约定了传输过程中的服务质量(QoS=0\1\2
)。
- 发布消息(
PUBLISH
):PUBLISH
控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息; - 发布确认(
PUBACK
):PUBACK
报文是对QoS 1
等级的PUBLISH
报文的响应; - 发布收到(
PUBREC
):PUBREC
报文是对QoS
等级2的PUBLISH
报文的响应,它是QoS 2
等级协议交换的第二个报文; - 发布释放(
PUBREL
):PUBREL
报文是对PUBREC
报文的响应,它是QoS 2
等级协议交换的第三个报文; - 发布完成(
PUBCOMP
):PUBCOMP
报文是对PUBREL
报文的响应,它是QoS 2
等级协议交换的第四个也是最后一个报文。
3.6.1 QoS0
-至多一次,最多一次
客户端->服务端PUBLISH
,服务端无需向客户端发送确认消息,这就是最多一次消息,参考报文:
MqttPublishMessage[
fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=31], variableHeader=MqttPublishVariableHeader[topicName=testtopic, packetId=-1], payload=PooledSlicedByteBuf(ridx: 0, widx: 20, cap: 20/20, unwrapped: PooledUnsafeDirectByteBuf(ridx: 33, widx: 33, cap: 448))]
其中payload
的数据可以用下面代码获取:
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
mqttPublishMessage.payload().readBytes(headBytes);
String data = new String(headBytes);
3.6.2 QoS1
- 至少一次,服务器下发确认消息
客户端->服务端PUBLISH
,参考报文:
MqttPublishMessage[
fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=26],
variableHeader=MqttPublishVariableHeader[topicName=test/netty/get, packetId=4],
payload=PooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 480))]
服务端->客户端PUBACK
,参考报文:
MqttPubAckMessage[
fixedHeader=MqttFixedHeader[messageType=PUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2],
variableHeader=MqttMessageIdVariableHeader[messageId=4],
payload=]
3.6.3 QoS2
-刚好一次(共四个报文)
客户端->服务端 PUBLISH
第一个报文:
MqttPublishMessage[
fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=EXACTLY_ONCE, isRetain=false, remainingLength=28],
variableHeader=MqttPublishVariableHeader[topicName=test/netty/post, packetId=5],
payload=PooledSlicedByteBuf(ridx: 0, widx: 9, cap: 9/9, unwrapped: PooledUnsafeDirectByteBuf(ridx: 30, widx: 30, cap: 496))]
服务端->客户端 PUBREC
第二个报文:
MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PUBREC, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=2],
variableHeader=MqttMessageIdVariableHeader[messageId=5],
payload=]
客户端->服务端PUBREL
第三个报文:
MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PUBREL, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=2],
variableHeader=MqttMessageIdVariableHeader[messageId=5],
payload=]
服务端->客户端 PUBCOMP
第四个报文:
MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PUBCOMP, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2],
variableHeader=MqttMessageIdVariableHeader[messageId=5],
payload=]
参考文章
[1] Netty
实战入门详解——让你彻底记住什么是Netty
(看不懂你来找我)
[2] Springboot+Netty
搭建MQTT
协议的服务端(基础Demo
)
[3] 第一章 - MQTT
介绍 · MQTT
协议中文版
[4] Netty
实现高性能IOT
服务器(Groza
)之手撕MQTT
协议篇上