Netty核心技术九--TCP 粘包和拆包及解决方案

1. TCP 粘包和拆包基本介绍

  1. **TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。**这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的

  2. 由于TCP无消息保护边界, 需要在接收端处理消息边界问题,也就是我们所说的粘包拆包问题

  3. TCP粘包、拆包图解

    image-20230708102428452

    假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数是不定的,故可能存在以下四种情况:

    1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包
    2. 服务端一次接受到了两个数据包,D1和D2粘合在一起,称之为TCP粘包
    3. 服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称之TCP拆包
    4. 服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。这称之TCP拆包

2. TCP 粘包和拆包现象示例1

实例需求:

在编写Netty 程序时,如果没有做处理,就会发生粘包和拆包的问题

2.1 MyClient

package site.zhourui.nioAndNetty.netty.tcp;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;public class MyClient {public static void main(String[] args)  throws  Exception{EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new MyClientInitializer()); //自定义一个初始化类ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();channelFuture.channel().closeFuture().sync();}finally {group.shutdownGracefully();}}
}

2.2 MyClientInitializer

只在Initializer加入了MyClientHandler处理业务

package site.zhourui.nioAndNetty.netty.tcp;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;public class MyClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new MyClientHandler());}
}

2.3 MyClientHandler

  • 使用客户端发送10条数据 hello,server 编号
  • 回显客户端接收到消息
package site.zhourui.nioAndNetty.netty.tcp;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.nio.charset.Charset;public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {private int count;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//使用客户端发送10条数据 hello,server 编号for(int i= 0; i< 10; ++i) {ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));ctx.writeAndFlush(buffer);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {byte[] buffer = new byte[msg.readableBytes()];msg.readBytes(buffer);String message = new String(buffer, Charset.forName("utf-8"));System.out.println("客户端接收到消息=" + message);System.out.println("客户端接收消息数量=" + (++this.count));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}

2.4 MyServer

package site.zhourui.nioAndNetty.netty.tcp;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class MyServer {public static void main(String[] args) throws Exception{EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定义一个初始化类ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();channelFuture.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

2.5 MyServerInitializer

只在Initializer加入了MyServerHandler处理业务

package site.zhourui.nioAndNetty.netty.tcp;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;public class MyServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new MyServerHandler());}
}

2.6 MyServerHandler

  • 打印服务器接收到数据
  • 服务器回送数据给客户端, 回送一个随机id
package site.zhourui.nioAndNetty.netty.tcp;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.nio.charset.Charset;
import java.util.UUID;public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf>{private int count;@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//cause.printStackTrace();ctx.close();}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {byte[] buffer = new byte[msg.readableBytes()];msg.readBytes(buffer);//将buffer转成字符串String message = new String(buffer, Charset.forName("utf-8"));System.out.println("服务器接收到数据 " + message);System.out.println("服务器接收到消息量=" + (++this.count));//服务器回送数据给客户端, 回送一个随机id ,ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", Charset.forName("utf-8"));ctx.writeAndFlush(responseByteBuf);}
}

2.7 测试

  1. 启动MyServer

    image-20230708105325765

  2. 启动一个MyClient

    image-20230708105358618

    image-20230708105413184

  3. 再启动一个MyClient

    客户端收到了4个服务端回复的消息

    image-20230708105538832

    服务端发送4次数据,但是

    image-20230708105506907

服务端发送的10个数据被分4次发送,有粘包情况,并且每次执行客户端被划分的个数都不一样

3. TCP 粘包和拆包解决方案

  1. 使用自定义协议 + 编解码器 来解决
  2. 关键就是要解决 服务器端每次读取数据长度的问题, 这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的TCP 粘包、拆包。

3.1 自定义协议 + 编解码器 解决TCP 粘包、拆包 示例2

实例要求:

image-20230708111517718

  1. 要求客户端发送 5 个 Message 对象, 客户端每次发送一个Message对象
  2. 服务器端每次接收一个Message, 分5次进行解码,每读取到一个Message,会回复一个Message 对象 给客户端.

示例1的代码基础上做扩充

3.1.1 自定义协议 MessageProtocol

  • len:每个数据包的长度
  • content:每个数据包的内容
package site.zhourui.nioAndNetty.netty.protocoltcp;//协议包
public class MessageProtocol {private int len; //关键private byte[] content;public int getLen() {return len;}public void setLen(int len) {this.len = len;}public byte[] getContent() {return content;}public void setContent(byte[] content) {this.content = content;}
}

3.1.2 对应协议的编码器MyMessageEncoder

  1. extends MessageToByteEncoder<MessageProtocol>,因为编码器接收到的数据可以确定为MessageProtocol
  2. 重写encode将我们的数据封装为MessageProtocol协议对象
package site.zhourui.nioAndNetty.netty.protocoltcp;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {@Overrideprotected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {System.out.println("MyMessageEncoder encode 方法被调用");out.writeInt(msg.getLen());out.writeBytes(msg.getContent());}
}

3.1.3 对应协议的解码器MyMessageDecoder

  • ReplayingDecoder<Void>:Void代表不需要状态管理(由ByteToMessageDecoder来帮我们自动识别并管理)
  • 将得到二进制字节码-> MessageProtocol 数据包(对象)
  • 封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理
package site.zhourui.nioAndNetty.netty.protocoltcp;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;import java.util.List;public class MyMessageDecoder extends ReplayingDecoder<Void> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {System.out.println("MyMessageDecoder decode 被调用");//需要将得到二进制字节码-> MessageProtocol 数据包(对象)int length = in.readInt();byte[] content = new byte[length];in.readBytes(content);//封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(length);messageProtocol.setContent(content);out.add(messageProtocol);}
}

3.1.4 MyClient和MyServer没有变动

3.1.6 MyClientInitializer

在客户端业务handler之前加入了协议编码器和协议解码器

package site.zhourui.nioAndNetty.netty.protocoltcp;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;public class MyClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new MyMessageEncoder()); //加入编码器pipeline.addLast(new MyMessageDecoder());//加入解码器pipeline.addLast(new MyClientHandler());}
}

3.1.7 MyServerInitializer

在服务端业务handler之前加入了协议编码器和协议解码器

package site.zhourui.nioAndNetty.netty.protocoltcp;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;public class MyServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new MyMessageEncoder()); //加入编码器pipeline.addLast(new MyMessageDecoder());//加入解码器pipeline.addLast(new MyServerHandler());}
}

3.1.8 MyClientHandler

  • channelActive:channel激活的时候发送5条数据,将数据封装为MessageProtocol协议对象(这个对象会给到MyClientHandler)
  • 打印出服务器端发送的内容
package site.zhourui.nioAndNetty.netty.protocoltcp;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.nio.charset.Charset;public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {private int count;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//使用客户端发送10条数据 "今天天气冷,吃火锅" 编号for(int i = 0; i< 5; i++) {String mes = "今天天气冷,吃火锅";byte[] content = mes.getBytes(Charset.forName("utf-8"));int length = mes.getBytes(Charset.forName("utf-8")).length;//创建协议包对象MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(length);messageProtocol.setContent(content);ctx.writeAndFlush(messageProtocol);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {int len = msg.getLen();byte[] content = msg.getContent();System.out.println("客户端接收到消息如下");System.out.println("长度=" + len);System.out.println("内容=" + new String(content, Charset.forName("utf-8")));System.out.println("客户端接收消息数量=" + (++this.count));System.out.println("" );}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("异常消息=" + cause.getMessage());ctx.close();}
}

3.1.9 MyServerHandler

  • 打印客户端发送内容
  • 每接收一次消息就回复客户端一个messageProtocol消息
package site.zhourui.nioAndNetty.netty.protocoltcp;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.nio.charset.Charset;
import java.util.UUID;public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol>{private int count;@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//cause.printStackTrace();ctx.close();}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {//接收到数据,并处理int len = msg.getLen();byte[] content = msg.getContent();System.out.println();System.out.println();System.out.println();System.out.println("服务器接收到信息如下");System.out.println("长度=" + len);System.out.println("内容=" + new String(content, Charset.forName("utf-8")));System.out.println("服务器接收到消息包数量=" + (++this.count));//回复消息String responseContent = UUID.randomUUID().toString();int responseLen = responseContent.getBytes("utf-8").length;byte[]  responseContent2 = responseContent.getBytes("utf-8");//构建一个协议包MessageProtocol messageProtocol = new MessageProtocol();messageProtocol.setLen(responseLen);messageProtocol.setContent(responseContent2);ctx.writeAndFlush(messageProtocol);}
}

3.1.10 测试

  1. 启动服务端

    image-20230708134722220

  2. 启动一个客户端

    1. 客户端发送了5个协议数据
    2. 服务端也收到5个协议数据
    3. 服务端在回送5个协议数据
    4. 客户端接收到5个协议数据

    image-20230708134812094

    image-20230708134835031

3.2 个人总结

个人总结

  1. 首先必须要熟悉handler调用链是怎么只是(即encode和decode方法在什么时候执行拿到的是什么数据需要传出给下一个handler的数据应该是什么类型)
  2. 客户端启动首先执行客户端的MyClientHandler的channelActive方法将数据封装为messageProtocol类型的协议对象
  3. 因为是入站那么handler调用链inbound方法的实现类即MyMessageEncoder的encode方法将协议对象转换为字节对象来进行网络传输
  4. 服务端因为是出站那么handler调用链outbound方法的实现类MyMessageDecoder的decode方法将字节对象封装为协议对象,然后将该对象放入out集合
  5. 服务端的MyMessageDecoder的下一个handler即MyServerHandler会拿到out,然后进行业务处理(这里就是打印并返回一个数据并封装为协议对象,因为是5个那么就会执行5次)
  6. 因为是入站那么handler调用链inbound方法的实现类即MyMessageEncoder的encode方法将协议对象转换为字节对象来进行网络传输
  7. 客户端因为是出站那么handler调用链outbound方法的实现类MyMessageDecoder的decode方法将字节对象封装为协议对象,然后将该对象放入out集合
  8. 服务端的MyMessageDecoder的下一个handler即MyClientHandler会拿到out,然后进行业务处理(这里就是打印因为是5个那么就会执行5次)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/12935.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

爆肝整理,Docker容器测试-常见问题+解决(汇总)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 问题1&#xff1a…

React + TypeScript 实践

主要内容包括准备知识、如何引入 React、函数式组件的声明方式、Hooks、useRef<T>、useEffect、useMemo<T> / useCallback<T>、自定义 Hooks、默认属性 defaultProps、Types or Interfaces、获取未导出的 Type、Props、常用 Props ts 类型、常用 React 属性类…

TensorFlow: mode.save()报错 non-trackable object: (None, None)

问题描述 环境&#xff1a;tensorflow2.2.0 执行model.save(), 报错 ...non-trackable object: (None, None)... 解决办法 搞了半天没有找到好的办法&#xff0c;只能通过升级搞定 pip uninstall tensorflow pip install tensorflow2.6.0 pip install --upgrade keras2.6…

数据结构--线性表(顺序表、单链表、双链表、循环链表、静态链表)

前言 学习所记录&#xff0c;如果能对你有帮助&#xff0c;那就泰裤辣。 目录 1.线性表概念 定义 基本操作 2.顺序表 定义 顺序表的实现--静态分配 动态分配 顺序表的特点 顺序表的插入和删除 顺序表的查找 按位查找 按值查找 3.单链表 定义 单链表的初始化 不带…

在Windows环境下安装Elasticsearch 8.8.2

Elasticsearch是一种开源的分布式搜索和分析引擎&#xff0c;被广泛应用于构建实时搜索、日志分析、数据可视化等应用。本文将详细介绍如何在Windows环境下安装和配置Elasticsearch 8。 安装Elasticsearch 步骤1&#xff1a;准备工作 在开始安装之前&#xff0c;确保已满足以…

gitee注册以及使用的简单教程

目录 1.gitee是什么&#xff1f; 2. gitee怎么注册? 3.gitee创建仓库 4.gitee怎么提交代码? 5. git的三板斧 1.gitee是什么&#xff1f; 基于Git的代码托管和研发协作平台上面可以托管个人或者公司的代码和开源项目。国外有github&#xff0c;国内有giteegithub经常出现…

docker安装postgresql

docker run --name postgres -e POSTGRES_PASSWORD123456 -p 5432:5432 -v /mydata/postgres/pgdata:/var/lib/postgresql/data -d postgres 修改postgresql最大连接数 vim /mydata/postgres/pgdata/postgresql.conf 附:常用连接数查看命令 -- 1.查看当前配置的最大连接数 s…

4通道AD采集子卡模块有哪些推荐?

FMC134是一款4通道3.2GSPS&#xff08;2通道6.4GSPS&#xff09;采样率12位AD采集FMC子卡模块&#xff0c;该板卡为FMC标准&#xff0c;符合VITA57.4规范&#xff0c;可以作为一个理想的IO模块耦合至FPGA前端&#xff0c;16通道的JESD204B接口通过FMC连接器连接至FPGA的高速串行…

为了实现上网自由,我做了一个多功能串口服务器

项目作者&#xff1a;小华的物联网嵌入式之旅 介绍&#xff1a;从事电气自动化行业&#xff0c;多次获得物联网设计竞赛&#xff0c;爱好嵌入式设计开发&#xff0c;物联网开发。 设计方案思路的由来&#xff0c;是因为我们现在的开发板基本需要通过串口与WIFI模组或以太网模…

Redis 从入门到精通【进阶篇】之Redis事务详解

文章目录 0.前言1.Redis 事务基本流程 1.事务详解1.1. 开始事务1.2. 命令入队1.3. 执行事务1.6. 带 WATCH 的事务1.7. WATCH 命令的实现1.8. WATCH 的触发1.9. 事务的 ACID 性质 2.总结2.1. 在事务和非事务状态下2.2. 小结2.3. 为什么Redis 的事务并不是真正的原子操作2.4. 为什…

SpringBoot学习——追根溯源servlet是啥,tomcat是啥,maven是啥 springBoot项目初步,maven构建,打包 测试

目录 引出追根溯源&#xff0c;过渡衔接servlet是啥&#xff1f;tomcat是啥&#xff1f; 前后端开发的模式1.开发模式&#xff1a;JavaWeb&#xff1a;MVC模型2.Web&#xff1a;Vue&#xff0c;MVVC模型3.后端相关3.1 同步与异步3.2 Controller层3.3 Service层&#xff1a;要加…

【C++初阶(三)】引用详解(对比指针)

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C初阶之路⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习排序知识   &#x1f51d;&#x1f51d; 引用 1. 前言2. 引用的概念3. 引用的特性4. …