目录
25.自定义编解码器
25.1 自定义编解码器编码
25.2 自定义编解码器的总结和补充
26.自定义通信协议
26.1 关于通信协议的关注点
26.2 自定义通信协议的格式
26.3 编解码
25.自定义编解码器
有了上面这个大体框架的流程之后,我们来聊一个非常特殊的:
比如我们在客户端想把字符串"10-20"经过编码后转为long类型,然后转为二进制存储到ByteBuf中,通过网络IO发出去,服务端接收到ByteBuf后,解码为long类型
那么在netty中是没有这么个编解码器的,所以我们需要自己去实现这个编解码器。在框架中,你要扩展自定义一个组件也好,功能也罢,需要遵循它们的规范,也就是继承它的一些接口或者实现它的一些实现类。比如:spring中的Event那个事件通知。
你自定义的东西只有实现别的规范,这样才能被框架体系发现你自定义的类,进而你自定义的东西才能和框架融为一体,融为一体后才能实现你的功能,框架才会调用你。
而对于编码器,我们需要继承的是MessageToByteEncoder,对于解码器,我们需要继承的是ByteToMessageDecoder。然后在pipline中生效,这就是它的逻辑。
于是基于这个逻辑来实现我们的自定义编解码器
25.1 自定义编解码器编码
需求:
1.我们在客户端发送一个字符串"10~20",这样的数据,然后我们希望发送出去的时候是以分隔符为界限,发出去两个long类型的数据
2.服务端接收到之后把数据从ByteBuf中取出来
需求分析:
这就是需要一组编解码器,编码器把字符串分隔开,然后转为两个long发出去
解码器把接收到的ByteBuf数据拿到手,解码转为long类型处理
具体实现如下:
- 客户端实现
package com.messi.netty_core_02.netty10;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;import java.net.InetSocketAddress;public class MyNettyClient {public static void main(String[] args) throws InterruptedException {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);NioEventLoopGroup group = new NioEventLoopGroup();bootstrap.group(group);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//日志输出的编码器pipeline.addLast(new LoggingHandler());//我们自定义的编码器pipeline.addLast(new MyLong2ByteEncoder());}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();channel.writeAndFlush("10-20");group.shutdownGracefully();}}
package com.messi.netty_core_02.netty10;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyLong2ByteEncoder extends MessageToByteEncoder<String> {private static final Logger log = LoggerFactory.getLogger(MyLong2ByteEncoder.class);/**** @param ctx 上下文对象* @param msg 等待编码的数据* @param out 编码后的数据为二进制字节流格式,都存储到out中交给Netty,netty负责把ByteBuf数据交给socket缓冲区* socket缓冲区会把数据交给网络IO通信发给服务端* @throws Exception*/@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {log.info("MyLong2ByteEncoder.encode start ~~~ ") ;if (msg == null) {return;}String[] msgs = msg.split("-");for (String message : msgs) {long longMsg = Long.parseLong(message);//每一个long类型的数据都在ByteBuf中占用8字节的大小空间out.writeLong(longMsg);}}
}
- 服务端实现
package com.messi.netty_core_02.netty10;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer {private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new LoggingHandler());pipeline.addLast(new MyByte2LongDecoder());pipeline.addLast(defaultEventLoopGroup,new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if(msg instanceof Long) {Long data = (Long) msg;log.info("得到的客户端输入为:{}",data) ;}}});}});serverBootstrap.bind(8000);}}
package com.messi.netty_core_02.netty10;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.List;public class MyByte2LongDecoder extends ByteToMessageDecoder {private static final Logger log = LoggerFactory.getLogger(MyByte2LongDecoder.class);/**** @param ctx 上下文对象* @param in 网络IO传输过来的数据,存储到ByteBuf类型的in中* @param out 解码处理完后,把每一个"Message"都存储到out这一集合中,便于后续Netty遍历该out集合进行执行pipeline流水线的Handler操作* Netty是以消息Message为单位进行处理数据的,有多少Message,就执行多少次pipeline* ---->* 如果字节数不超过滑动窗口,socket缓冲区,ByteBuf缓冲区,客户端只发一次数据即可,为什么服务端能划分出多个Message呢?* 原因很简单:每调用一次decode进行处理ByteBuf,那么就会封装成一个Message。* 如果服务端一次没有处理完ByteBuf,那么会进行第二次调用decode方法的处理,则第二次处理ByteBuf,则又会生成一个新的Message* 以此类推,直到ByteBuf中的数据都处理完。* 调用n次decode方法,处理ByteBuf n次,存储n个Message到out集合中* ---->* 啥时候意味着ByteBuf中的数据都处理完?* 其实还是指针的移动,思考一下之前NIO的各种指针,当读指针到达limit指针边界后就意味着ByteBuf处理完毕* @throws Exception*/@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {log.info("MyByte2LongDecoder解码器start~~~");//获得ByteBuf中读写指针之间可读的数据长度int readableBytes = in.readableBytes();//如果客户端写的数据是long占8字节,这里是判断长度字节够不够,如果不够,那么说明有问题if (readableBytes >= 8) {//发送的是"10-20",第一次拿到的是10L(long类型),第二次是20Llong reciveLong = in.readLong();out.add(reciveLong);}}
}
- 测试输出
客户端输出:
由于自定义编码中去除了分隔符"-",所以只发送16字节的数据给服务端。注意:是一次性发送16字节的数据给服务端
服务端输出:
如下图所示,服务端自定义解码器调用了两次,为什么?
因为在自定义解码器的逻辑中,一次只处理了ByteBuf中8字节的数据大小,所以一次处理不完ByteBuf,所以会多次处理,多次处理会多次调用decode方法,会多次加入"Message"到out这一List集合中。由于out这一集合中有多个Message,所以pipeline流水线的Handler会执行多次。
总结:
首先明确一点,客户端对于这两个long类型的数字,10和20,他是只发送了一次的,因为我们首先看到这个数据的结构,其次日志只输出了一次。
再次我们来看服务端日志,数据是处理了两次的,因为输出了两次,解码器处理了两次。那么这是为
啥。
1、首先不是半包黏包,因为我们的缓冲区不至于连两个long都拿不到。
2、因为是两个消息,我们的数据在被解码器处理之后客户端那里其实是把数据处理位两个long发过来
的,服务端这里解码处理其实得到两个消息message,添加到out集合里面是两个消息。然后就处理了两次,这里有个问题就是,当你的ByteBuf的数据一次没处理完,那就会继续调用decode方法,进行再次处理,因为我们的数据是16字节,第一次值处理了八个字节,因为我们读取的就是一个readLong,long就是读取读写指针的前八个字节,所以处理了两次。
- 为了验证上述结论,我们更改一下服务端以及自定义解码器,其余的代码不修改:
package com.messi.netty_core_02.netty10;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer {private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new LoggingHandler());pipeline.addLast(new MyByte2LongDecoder());pipeline.addLast(defaultEventLoopGroup,new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// if(msg instanceof Long) {
// Long data = (Long) msg;
// log.info("得到的客户端输入为:{}",data) ;
// }//验证读取n次ByteBuf,调用n次decode方法,对应有n个Message,会添加n个Message到out集合//netty拿到该out集合,会遍历并且让每一个Message都执行一遍pipeline流水线log.info("得到的客户端输入为:{}",msg.toString());}});}});serverBootstrap.bind(8000);}}
package com.messi.netty_core_02.netty10;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.List;public class MyByte2LongDecoder extends ByteToMessageDecoder {private static final Logger log = LoggerFactory.getLogger(MyByte2LongDecoder.class);/**** @param ctx 上下文对象* @param in 网络IO传输过来的数据,存储到ByteBuf类型的in中* @param out 解码处理完后,把每一个"Message"都存储到out这一集合中,便于后续Netty遍历该out集合进行执行pipeline流水线的Handler操作* Netty是以消息Message为单位进行处理数据的,有多少Message,就执行多少次pipeline* ---->* 如果字节数不超过滑动窗口,socket缓冲区,ByteBuf缓冲区,客户端只发一次数据即可,为什么服务端能划分出多个Message呢?* 原因很简单:每调用一次decode进行处理ByteBuf,那么就会封装成一个Message。* 如果服务端一次没有处理完ByteBuf,那么会进行第二次调用decode方法的处理,则第二次处理ByteBuf,则又会生成一个新的Message* 以此类推,直到ByteBuf中的数据都处理完。* 调用n次decode方法,处理ByteBuf n次,存储n个Message到out集合中* ---->* 啥时候意味着ByteBuf中的数据都处理完?* 其实还是指针的移动,思考一下之前NIO的各种指针,当读指针到达limit指针边界后就意味着ByteBuf处理完毕* @throws Exception*/@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {log.info("MyByte2LongDecoder解码器start~~~");//获得ByteBuf中读写指针之间可读的数据长度int readableBytes = in.readableBytes();//如果客户端写的数据是long占8字节,这里是判断长度字节够不够,如果不够,那么说明有问题if (readableBytes >= 8) {//发送的是"10-20",第一次拿到的是10L(long类型),第二次是20L
// long reciveLong = in.readLong();
// out.add(reciveLong);//如果一次读取16字节,是不是可以一次读完ByteBuf,那么就只会添加一个Message到out集合中,那么只会调用一次pipeline流水线out.add(in.readBytes(16));}}
}
测试输出:
客户端:客户端还是只发了一次数据就完事了
服务端:
接收到客户端一次发送的数据后存储到ByteBuf中,但是会多次解析读取该ByteBuf,多次调用decode方法,把每一次decode解码的Message存储到out集合中,把out交给netty,之后会遍历out集合,让每一个Message都对应调用一遍pipeline流水线。pipeline流水线的Handler拿到的数据就是out集合中解码后的数据
,然后我们就可以根据该解码后的数据去做一系列的操作了。
为什么只执行一次?
因为ByteBuf被一次读取完了,哈哈哈,那么就肯定只执行一次decode方法了。所以out集合中只会添加一个Message,那么只会调用一遍pipeline流水线的Handler。
25.2 自定义编解码器的总结和补充
1.自定义编码器
继承 MessageToByteEncoder,自定义实现其encode方法的逻辑
2.自定义解码器
继承 ByteToMessageDecoder,自定义实现其decode方法的逻辑
其实还有一种设计方法,我们前面说过存在一个集合体,拥有编解码器的能力的一个类。
ByteToMessageCodec,于是我们可以继承这个类,实现两个方法一个是decode一个是encode。
这种方式就可以替代上述两种自定义编码器和解码器,操作步骤如下:
- MyLongCodec类的设计
package com.messi.netty_core_02.netty10;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.List;public class MyLongCodec extends ByteToMessageCodec<String> {private static final Logger log = LoggerFactory.getLogger(MyLongCodec.class);@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {log.info("MyLong2ByteEncoder.encode start ~~~ ") ;if (msg == null) {return;}String[] msgs = msg.split("-");for (String message : msgs) {long longMsg = Long.parseLong(message);//每一个long类型的数据都在ByteBuf中占用8字节的大小空间out.writeLong(longMsg);}}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {log.info("MyByte2LongDecoder解码器start~~~");//获得ByteBuf中读写指针之间可读的数据长度int readableBytes = in.readableBytes();//如果客户端写的数据是long占8字节,这里是判断长度字节够不够,如果不够,那么说明有问题if (readableBytes >= 8) {//发送的是"10-20",第一次拿到的是10L(long类型),第二次是20L
// long reciveLong = in.readLong();
// out.add(reciveLong);//如果一次读取16字节,是不是可以一次读完ByteBuf,那么就只会添加一个Message到out集合中,那么只会调用一次pipeline流水线out.add(in.readBytes(16));}}
}
- 客户端与服务端的设计如下
package com.messi.netty_core_02.netty10;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;import java.net.InetSocketAddress;public class MyNettyClient {public static void main(String[] args) throws InterruptedException {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);NioEventLoopGroup group = new NioEventLoopGroup();bootstrap.group(group);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//日志输出的编码器pipeline.addLast(new LoggingHandler());//我们自定义的编码器
// pipeline.addLast(new MyLong2ByteEncoder());pipeline.addLast(new MyLongCodec());}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();channel.writeAndFlush("10-20");group.shutdownGracefully();}}
package com.messi.netty_core_02.netty10;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MyNettyServer {private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new LoggingHandler());
// pipeline.addLast(new MyByte2LongDecoder());pipeline.addLast(new MyLongCodec());pipeline.addLast(defaultEventLoopGroup,new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// if(msg instanceof Long) {
// Long data = (Long) msg;
// log.info("得到的客户端输入为:{}",data) ;
// }//验证读取n次ByteBuf,调用n次decode方法,对应有n个Message,会添加n个Message到out集合//netty拿到该out集合,会遍历并且让每一个Message都执行一遍pipeline流水线log.info("得到的客户端输入为:{}",msg.toString());}});}});serverBootstrap.bind(8000);}}
测试输出:
客户端:
服务端:
测试通过,成功。
其实还有一种解码器ReplayingDecoder
这个解码器是netty封装的,这个解码器比较强大,我们在使用这个解码器的时候是不需要做一些安全性校验的。比如我们之前的解码器,做了一个长度判断,如下伪代码所示:
// 获得byteBuf中读写指针之间可读的数据长度
int readableBytes = in.readableBytes();
// 我们客户端写的数据是long占八字节,所以这里判断接收到的数据长度是不是够
if(readableBytes >= 8){
这个检验是因为我们担心服务端接收到的长度不够,丢失数据精度或者是异常抛出等等。如果不做长度判断,那么这一次读取错了,以后只会越读越错误。
甚至更有可能:接收到的根本不是这个类型的数据!比如说发送过来的是:两个整型或8个Byte类型,这一共也是8字节,所以肯定会有问题。
如果出现以上问题,你读错误一次,就算你发现该错误或抛出异常,也无法挽回,为什么无法挽回呢?因为你读取ByteBuf的数据是会改变读指针的,一旦改变指针,你就无法再往回读了。
所以这里我们可以使用到之前NIO总结的一种回滚指针的解决思路,当然netty中也有对应的回滚指针解决方案,修改代码如下所示:
package com.messi.netty_core_02.netty10;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.List;public class MyByte2LongDecoder2 extends ByteToMessageDecoder {private static final Logger log = LoggerFactory.getLogger(MyByte2LongDecoder2.class);@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {log.info("MyByte2LongDecoder2解码器执行");int readableBytes = in.readableBytes();if (readableBytes >= 8) {//标记读指针此时的位置,错误时回滚到该位置in.markReaderIndex();try {//发过来的是"10-20",第一次读取的是20L,第二次读取的是20Llong reciveLong = in.readLong();out.add(reciveLong);} catch (Exception e) {//发生异常时,回滚到标记位置in.resetReaderIndex();}}}
}
但是你要是继承实现了ReplayingDecoder,这个解码器,那你那些安全校验,全部取消,变为如下这样。
代码如下:
package com.messi.netty_core_02.netty10;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.ReplayingDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.List;public class MyByte2LongDecoder3 extends ReplayingDecoder {private static final Logger log = LoggerFactory.getLogger(MyByte2LongDecoder2.class);@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {log.info("MyByte2LongDecoder2解码器执行");//发过来的是"10-20",第一次读取的是20L,第二次读取的是20Llong reciveLong = in.readLong();out.add(reciveLong);}}
原理很简单:底层帮你处理了所有的安全问题,重置了ByteBuf,传递给你的是一个安全的处理过的ByteBuf,不管你什么异常什么问题,都会给你解决。
# 所以你的netty编程,要考虑如下方面:
1. ByteBuf的读写指针要考虑是不是正确,错了是不是能回滚。
2. ByteBuf是不是合理释放了。
26.自定义通信协议
26.1 关于通信协议的关注点
网络传输有很多协议了,像HTTP这种都是应用层协议,但是有时候我们在做网络通信的时候,需要自己定义一些协议,我们能够自定义出来的协议都是属于上层(应用层)的协议,因为都是基于底层封装好的TCP/UDP网络协议栈的基础协议。我们自定义出一些协议,来约束双方的接收和发送,这种时候需要我们来自定义通信协议,也就是消息的格式。
说白了就是客户端与服务端传输的格式,样式
在这个数据传递的过程中,你需要注意的就是除了基本数据的定义格式,还需要注意传递数据的长度,不然会出现半包粘包问题。
而且除了你的业务数据,还需要一些元数据(如:数据长度等),这些都要考虑,来举一个登录的例子:
版本1:
我们设计一个消息结构,就是json格式的内容:{"name":"xxx","password":"xxx"}
登录信息,无外乎就是账号密码。我们这么设计没毛病,但是没意义,首先没有数据长度,会出现半包粘包问题。
于是我们需要增加一个数据长度
版本2:
协议正文,或者叫做协议体,消息正文依然是账号密码
{"name":"xxx","password":"123456"}
正文长度:告诉服务端你一次读取多少数据,避免半包粘包
魔数:每个系统之间的一个读取标识,比如用来标识我们是一个集团或系统的,你就需要携带一个CAFEBABY过来,我就知道你的消息是我们系统的了,不携带这个魔数一律认为不合规,就丢弃掉。这是一个牌号,校验使用的,不然你随意一个系统知道ip,端口号就都能给我发数据,这不就乱了。
协议版本号:协议可能升级,告诉服务端当前协议是哪一个版本,服务端就采用对应的逻辑操作
指令类型:业务操作的编号,比如1是登录,2是注册,3是注销等等,告诉服务端具体使用的是哪种操作,http设计的时候实际上不需要这个,因为它没有什么业务操作,就是收发数据,然后就没了。但是你使用http协议想要做类型区别的话,那么你要做类型区别那就在请求里面加你的参数,然后服务端后台去判断。比如你url里面是delete那么就走去delete删除业务
26.2 自定义通信协议的格式
json序列化方式:
在有了上面的考虑点,我们现在可以定义一个我们消息的格式了,在定义格式之前,我们先考虑以什么形式构建,我们最熟悉的格式就是json,用json格式来组织起来上面的注意点。我们来看一个现在的结构。
大体上我们在客户端组织的消息格式就是这样的,然后我们经过封装为ByteBuf发送出去。
在服务端,用new JsonObjectDecode完成解码,进行后续的handler业务处理。
json的设计格式还行,json的字段本身就是能随便加的,但是这个格式有问题,就是他的长度太大了,因为我们这个json格式他全是字符串,在传输过程中不如二进制,或者其他的编码格式更加精简和体量小。但是json也有他的好处,就是可读性强,内部处理很方便。所以我们要是非要用json,建议除了消息体其他都用二进制。
- 但是一般在高并发开发中,并且如果是抛开SpringCloud-Web那一套Http协议,在netty原生开发时,我们可以自定义协议自定义序列化方式的话,我们有如下序列化方式的选择建议:
头信息(头信息是指除了消息正文content这一字段外的其他字段数据):一般都使用二进制byte直接存储
消息正文:json/java序列化/protobuf
为什么头信息不也直接使用java序列化或protobuf呢?java序列化或protobuf不也都是序列化成二进制?
答案其实很简单,直接写入二进制byte不带有对存储数据的额外描述信息。java序列化或protobuf把数据序列化成二进制,但是会带有许多额外的描述信息来保证可以反序列化成功,相比直接写入byte 占用的空间肯定要大很多。
为什么要使用java序列化或protobuf序列化取代json?
json序列化后的数据格式带有太多无用的符号,并且json序列化后的是字符串,所有的数据都是字符格式,所谓字符格式,一个普通字符占2字节,一个汉字占用3字节。而你protobuf或java序列化后的数据是二进制格式,8个二进制等于1字节,高下立判?
并且序列化后需要进行统一编码成二进制字节流存储到ByteBuf或其他应用缓冲区中,便于后续发给socket内核缓冲区。如果采用json方式,字符编码成二进制字节格式,又是很大的性能消耗。但是如果采用java序列化或protobuf方式,本身序列化后就成为了二进制字节格式,不存在字符解析的性能消耗。是不是也是很大的性能优化?
补充:但是一般来说,我们使用的protobuf方式,而不采用java默认的序列化方式,因为protobuf性能更高。
- 由于序列化方式的变化选择,我们得出一种最终方案
使用netty做自定义协议时:
头信息:
客户端编码使用二进制直接写入,服务端收到消息后解码直接读取二进制
消息正文:
客户端使用protobuf方式序列化原生消息成二进制数据,服务端使用protobuf反序列化二进制数据成原生消息数据
- 由于序列化方式选择的不确定性,所以客户端发消息的时候要多加一个"序列化的方式"字段
该字段是用来标识选择的序列化方式是啥?
于是协议的内容就变成如下这样:
协议正文(或叫做协议体):
消息正文依然是账号密码,{"name":"xxx","password","123456"}
正文长度:
告诉服务端:我发的数据长度。所以服务端知道一次读取多少,避免半包粘包问题的发生
魔数:
每个系统之间的一个读取标识,比如用来标识我们是一个集团或系统的,你就携带一个CAFEBABY过来,我就知道你的消息是我们系统的了,不携带这个魔数则一律认为该请求消息是不合理的,就丢弃掉,这是一个牌号,校验使用的。不然你随意一个系统要是知道服务端的ip,端口的话,那岂不是都能给服务端发数据,那么不是乱了。
协议版本号:
协议可能升级,告诉服务端当前协议是哪一个版本,服务端采用对应的逻辑操作
指令类型:
业务操作的编号,比如1是登录,2是注册,3是注销等等,告诉服务端具体使用哪种操作。http设计的时候实际上不需要这个,因为他没什么业务操作,就是收发数据,没了。但是你使用http协议想要做类型区别的话,那么你要做类型区别那就在请求里面加你的参数,然后服务端后台去判断。比如你url里面是delete那么就走去delete删除业务
序列化的方式:
比如:1代表json 2代表protobuf 3代表Hessian 4代表java默认的序列化方式
26.3 编解码
在设计好了编码格式之后,我们就可以来考虑编解码了。
在这个过程中,我们的数据被封装成ByteBuf就发送出去了,但是思考一个问题,作为客户端你是写业务代码的,你发出去的应该就是一个纯净的对象,你不能在对象数据的属性中加上一个魔数,版本号什么的。你需要把对象数据之外的元数据加在对象外面并且协同对象一起发送过去,这个封装的过程我们就需要交给客户端对应的编码器去做。ofcourse,解析这个封装好的消息数据的过程肯定是交给接收消息的服务端解码器去做,服务端解析得到消息正文以及元数据,便于做之后的一系列业务操作。
编码过程如下
- 编写消息类Message
package com.messi.netty_core_02.netty11;import java.io.Serializable;public class Message implements Serializable {private String username;private String password;public Message() {}public Message(String username,String password) {this.username = username ;this.password = password ;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}@Overridepublic String toString() {return "Message{" +"username='" + username + '\'' +", password='" + password + '\'' +'}';}
}
- 客户端自定义编码器
package com.messi.netty_core_02.netty11;import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.charset.Charset;public class MyMessage2ByteEncoder extends MessageToByteEncoder<Message> {private static final Logger log = LoggerFactory.getLogger(MyMessage2ByteEncoder.class);@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {log.info("MyMessage2ByteEncoder.encode");//1.魔数 8字节out.writeBytes("leomessi".getBytes());//2.版本 1字节out.writeByte(1);//3.序列化方式 1字节 1代表json 2代表protobu 3代表hessianout.writeByte(1);//4.指令功能 1字节 1代表登录 2代表注册out.writeByte(1);//5.正文长度 4字节 使用int整型标识ObjectMapper objectMapper = new ObjectMapper();String jsonContent = objectMapper.writeValueAsString(msg);out.writeInt(jsonContent.length());//6.正文,直接写char序列out.writeCharSequence(jsonContent, Charset.defaultCharset());}
}
- 客户端
package com.messi.netty_core_02.netty11;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;import java.net.InetSocketAddress;public class MyNettyClient {public static void main(String[] args) throws InterruptedException {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);NioEventLoopGroup group = new NioEventLoopGroup();bootstrap.group(group);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new LoggingHandler());pipeline.addLast(new MyMessage2ByteEncoder());}});Channel channel = bootstrap.connect(new InetSocketAddress(8000)).sync().channel();channel.writeAndFlush(new Message("leo","101010"));System.out.println("MyNettyClient.main");group.shutdownGracefully();}}
- 服务端自定义解码器
package com.messi.netty_core_02.netty11;import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.charset.Charset;
import java.util.List;public class MyByte2MessageDecoder extends ByteToMessageDecoder {private static final Logger log = LoggerFactory.getLogger(MyByte2MessageDecoder.class);@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {log.info("MyByte2MessageDecoder.decode");//魔数ByteBuf magic = in.readBytes(8);log.info("魔数为:{}",magic);//协议版本号byte version = in.readByte();log.info("协议版本号为:{}",version);//序列化方式byte serializableType = in.readByte();log.info("序列化方式为:{}",serializableType) ;//指令功能byte funcNo = in.readByte();log.info("指令功能为:{}",funcNo) ;//正文长度int contentLength = in.readInt();log.info("正文长度为:{}",contentLength) ;Message message = null ;if (serializableType == 1 ){//说明序列化方式为json,此时服务端就要使用json反序列化方式进行反序列化ObjectMapper objectMapper = new ObjectMapper();message = objectMapper.readValue(in.readCharSequence(contentLength, Charset.defaultCharset()).toString(),Message.class);}out.add(message);}
}
- 服务端
package com.messi.netty_core_02.netty11;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @Description TODO* @Author etcEriksen* @Date 2023/12/20 9:43* @Version 1.0*/
public class MyNettyServer {private static final Logger log = LoggerFactory.getLogger(MyNettyServer.class);public static void main(String[] args) {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(new NioEventLoopGroup(1),new NioEventLoopGroup(8));DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();/*** maxFrameLength:数据包最大长度为1024字节* lengthFieldOffset:数据长度字段前面有多少字节的偏移?* lengthAdjustment:数据长度字段与真实数据体之间有多少距离?* initialBytesToStrip:最终服务端输出的数据去除前面多少字节的长度?* 具体见:Netty应用04-Netty这一笔记*/pipeline.addLast(new LoggingHandler());pipeline.addLast(new LengthFieldBasedFrameDecoder(1024,11,4,0,0));pipeline.addLast(new MyByte2MessageDecoder());pipeline.addLast(defaultEventLoopGroup,new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Message message = (Message) msg ;log.info("服务端接收到客户端的数据为:{}",message) ;}});}});serverBootstrap.bind(8000);System.out.println("MyNettyServer.main");}}
- 输出
客户端输出:
服务端输出:
- 补充
总结: