netty的基本用法
完整的介绍了netty最基本的发送byte、string、byteBuf发送接收数据,以及拆包、粘包演示,拆包、粘包的解决方案。看netty的用法只看本篇足以,本篇只说用法,不讲底层原理。
详细demo的git地址
示例的通用代码
客户端
- 客户端
package com.example.netty.common.client;import cn.hutool.core.util.ObjectUtil;
import com.example.netty.common.NettyThreadFactory;
import com.example.netty.common.utils.NettyConstants;
import com.example.netty.delimiter.client.DelimiterClientInitializer;
import com.example.netty.fixlength.client.FixLengthClientInitializer;
import com.example.netty.lengthfield.client.LengthFieldClientInitializer;
import com.example.netty.linebase.client.LineBaseClientInitializer;
import com.example.netty.nbcb.client.NormalNBCBClientInitializer;
import com.example.netty.normal.bytebuf.client.NormalByteBufClientInitializer;
import com.example.netty.normal.bytes.client.NormalByteClientInitializer;
import com.example.netty.normal.string.client.NormalStringClientInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;/*** <p></p>** @author xin* @version 2023/11/2 15:39**/
@Component
public class NettyClient {private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);private final EventLoopGroup group = new NioEventLoopGroup(new NettyThreadFactory("client"));private final Bootstrap bootstrap = new Bootstrap();private ChannelFuture channelFuture;@Asyncpublic void init(String type) {bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true);if (NettyConstants.NORMAL_BYTE_BUF.equalsIgnoreCase(type)) {logger.debug("+++++++初始化normal byte buf client ChannelInitializer ++++++++++++");bootstrap.handler(new NormalByteBufClientInitializer());} else if (NettyConstants.NORMAL_BYTE.equalsIgnoreCase(type)) {logger.debug("+++++++初始化normal byte client ChannelInitializer ++++++++++++");bootstrap.handler(new NormalByteClientInitializer());} else if (NettyConstants.NORMAL_STRING.equalsIgnoreCase(type)) {logger.debug("+++++++初始化normal string client ChannelInitializer ++++++++++++");bootstrap.handler(new NormalStringClientInitializer());}else if (NettyConstants.NBCB.equalsIgnoreCase(type)) {logger.debug("+++++++初始化粘包、拆包 client ChannelInitializer ++++++++++++");bootstrap.handler(new NormalNBCBClientInitializer());}else if (NettyConstants.FIX_LENGTH.equalsIgnoreCase(type)) {logger.debug("+++++++初始化粘包、拆包 固定长度解决方案 client ChannelInitializer ++++++++++++");bootstrap.handler(new FixLengthClientInitializer());}else if (NettyConstants.LINE_BASE.equalsIgnoreCase(type)) {logger.debug("+++++++初始化粘包、拆包 通过在包尾添加回车换行符 \\r\\n 来区分整包消息解决方案 client ChannelInitializer ++++++++++++");bootstrap.handler(new LineBaseClientInitializer());}else if (NettyConstants.DELIMITER.equalsIgnoreCase(type)) {logger.debug("+++++++初始化粘包、拆包 特殊字符作为分隔符来区分整包消息解决方案 client ChannelInitializer ++++++++++++");bootstrap.handler(new DelimiterClientInitializer());}else if (NettyConstants.LENGTH_FIELD.equalsIgnoreCase(type)) {logger.debug("+++++++初始化粘包、拆包 指定长度来标识整包消息,通过在包头指定整包长度来约定包长。解决方案 client ChannelInitializer ++++++++++++");bootstrap.handler(new LengthFieldClientInitializer());}}public ChannelFuture connect(String host, int port) {doConnect(host, port);return this.channelFuture;}public void doConnect(String host, Integer port) {if (ObjectUtil.isEmpty(host) || ObjectUtil.isEmpty(port)) {throw new RuntimeException("IP host 为null");}try {ChannelFuture future = bootstrap.connect(host, port);//休眠1秒,保证通道建立成功Thread.sleep(100);channelFuture = future;logger.info("ChannelFuture 创建状态{}", channelFuture.isSuccess());} catch (Exception e) {logger.error("client start fail", e);}}public void disconnect(Channel channel) {if (ObjectUtil.isNotNull(channel)) {try {ChannelFuture channelFuture = channel.close();channel.disconnect();} catch (Exception e) {logger.error("关闭通道异常", e);}}}
}
- 客户端心跳监听
package com.example.netty.common.client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** <p></p>** @author xin* @version 2023/11/2 16:23**/
public class ClientHeartBeatServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(ClientHeartBeatServerHandler.class);@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state().equals(IdleState.ALL_IDLE)) {logger.info("长时间没有进行过读写操作,发送查询状态的心跳");// 15 分钟没有进行读操作,给服务端发送心跳包
// ctx.channel().writeAndFlush(BACK_FLOW_ORDER_QUERY_STATUS);} else if (event.state().equals(IdleState.READER_IDLE)) {logger.info("长时间没有进行过读操作");} else if (event.state().equals(IdleState.WRITER_IDLE)) {logger.info("长时间没有进行过写操作");}}}
}
服务端
-
服务端
package com.example.netty.common.server;import com.example.netty.common.NettyThreadFactory; import com.example.netty.common.utils.NettyConstants; import com.example.netty.delimiter.server.DelimiterServerInitializer; import com.example.netty.fixlength.server.FixLengthServerInitializer; import com.example.netty.lengthfield.client.LengthFieldClientInitializer; import com.example.netty.lengthfield.server.LengthFieldServerInitializer; import com.example.netty.linebase.server.LineBaseServerInitializer; import com.example.netty.nbcb.server.NormalNBCBServerInitializer; import com.example.netty.normal.bytebuf.server.NormalByteBufServerInitializer; import com.example.netty.normal.bytes.server.NormalByteServerInitializer; import com.example.netty.normal.string.server.NormalStringServerInitializer; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component;/*** 启动主类** @author xin* @version Created by xin on 2021/4/30 10:35 上午*/ @Component public class NettyServer {private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);EventLoopGroup boss = null;EventLoopGroup worker = null;ChannelFuture channelFuture = null;@Asyncpublic void init(String host, int port, String type) {// 负责初始化netty服务器,并且开始监听端口的socket请求。ServerBootstrap serverBootstrap = new ServerBootstrap();// 做是否支持epoll轮询判断以获取更高性能 任务调度框架// boss 端口监听线程组boss = Epoll.isAvailable() ? new EpollEventLoopGroup(2,new NettyThreadFactory("boss")) : new NioEventLoopGroup(2, new NettyThreadFactory("boss"));// worker 消息处理线程组worker = Epoll.isAvailable() ? new EpollEventLoopGroup(2,new NettyThreadFactory("worker")) : new NioEventLoopGroup(2, new NettyThreadFactory("worker"));serverBootstrap.group(boss, worker).channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).localAddress(host, port)// option 是针对于 boss 线程组// 连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。.option(ChannelOption.SO_KEEPALIVE, true)// 开启Nagle算法,(尽可能的发送大块数据避免网络中充斥着大量的小数据块).option(ChannelOption.TCP_NODELAY, true)// ByteBuf 分配器.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT).option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT).option(ChannelOption.SO_REUSEADDR, true);// 此处是为了演示用的,初始化不同的 childHandlerif (NettyConstants.NORMAL_BYTE_BUF.equalsIgnoreCase(type)) {logger.debug("+++++++初始化normal byte buf server ChannelInitializer ++++++++++++");serverBootstrap.childHandler(new NormalByteBufServerInitializer());} else if (NettyConstants.NORMAL_BYTE.equalsIgnoreCase(type)) {logger.debug("+++++++初始化normal byte server ChannelInitializer ++++++++++++");serverBootstrap.childHandler(new NormalByteServerInitializer());} else if (NettyConstants.NORMAL_STRING.equalsIgnoreCase(type)) {logger.debug("+++++++初始化normal string server ChannelInitializer ++++++++++++");serverBootstrap.childHandler(new NormalStringServerInitializer());} else if (NettyConstants.NBCB.equalsIgnoreCase(type)) {logger.debug("+++++++初始化 粘包、拆包 server ChannelInitializer ++++++++++++");serverBootstrap.childHandler(new NormalNBCBServerInitializer());} else if (NettyConstants.FIX_LENGTH.equalsIgnoreCase(type)) {logger.debug("+++++++初始化 粘包、拆包 固定长度解决方案 server ChannelInitializer ++++++++++++");serverBootstrap.childHandler(new FixLengthServerInitializer());} else if (NettyConstants.LINE_BASE.equalsIgnoreCase(type)) {logger.debug("+++++++初始化 粘包、拆包 通过在包尾添加回车换行符 \\r\\n 来区分整包消息解决方案 server ChannelInitializer ++++++++++++");serverBootstrap.childHandler(new LineBaseServerInitializer());} else if (NettyConstants.DELIMITER.equalsIgnoreCase(type)) {logger.debug("+++++++初始化 粘包、拆包 特殊字符作为分隔符来区分整包消息解决方案 server ChannelInitializer ++++++++++++");serverBootstrap.childHandler(new DelimiterServerInitializer());}else if (NettyConstants.LENGTH_FIELD.equalsIgnoreCase(type)) {logger.debug("+++++++初始化粘包、拆包 指定长度来标识整包消息,通过在包头指定整包长度来约定包长。解决方案 client ChannelInitializer ++++++++++++");serverBootstrap.childHandler(new LengthFieldServerInitializer());}// childOption 是针对于 worker线程组// 连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。可以将此功能视为TCP的心跳机制,需要注意的是:默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true)// 开启Nagle算法,(尽可能的发送大块数据避免网络中充斥着大量的小数据块)// TCP参数,立即发送数据,默认值为Ture(Netty默认为True而操作系统默认为False)。该值设置Nagle算法的启用,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。Netty默认禁用该算法,从而最小化报文传输延时.childOption(ChannelOption.TCP_NODELAY, true)// ByteBuf 分配器.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT).childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT).childOption(ChannelOption.SO_REUSEADDR, true);// 负责绑定端口,当这个方法执行后,ServerBootstrap就可以接受指定端口上的socket连接了。一个ServerBootstrap可以绑定多个端口。try {channelFuture = serverBootstrap.bind().sync();logger.info("Netty 服务启动成功,端口:{}", channelFuture.channel().localAddress());} catch (Exception e) {logger.error("启动 Netty 服务时发生异常", e);}// 监听Channel关闭事件ChannelFuture closeFuture = channelFuture.channel().closeFuture();try {closeFuture.sync();} catch (InterruptedException e) {logger.error("关闭 Channel 发生异常", e);} finally {// 关闭线程组worker.shutdownGracefully();boss.shutdownGracefully();}} }
-
服务端心跳
package com.example.netty.common.server;import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.net.SocketAddress;/*** <p></p>** @author xin* @version 2023/11/2 16:23**/ public class ServerHeartBeatServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(ServerHeartBeatServerHandler.class);private static final int MIN_LOSS_CONNECT_COUNT = 12;private int lossConnectCount = 0;@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {logger.debug("已经5秒未收到客户端的消息了!");if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {lossConnectCount++;if (lossConnectCount > MIN_LOSS_CONNECT_COUNT) {logger.debug("这个不活跃通道!");// 如果有需要可以关闭不活跃的通道final SocketAddress socketAddress = ctx.channel().remoteAddress();ctx.channel().close().addListener((ChannelFutureListener) future -> {if (future.isSuccess()) {logger.info("close idle connect:" + socketAddress + " for " + event.state() + " done");} else {logger.info("close idle connect:" + socketAddress + " for " + event.state() + " fail");}});}}} else {super.userEventTriggered(ctx, evt);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {lossConnectCount = 0;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();} }
-
连接监听
package com.example.netty.common.server;import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** <p></p>** @author xin* @version 2023/11/2 16:21**/ @ChannelHandler.Sharable public class ConnectionCountHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(ConnectionCountHandler.class);/*** 每次过来一个新连接就对连接数加一** @param ctx ChannelHandlerContext*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {logger.debug("有新的链接加入:{}", ctx.channel().id().asLongText());super.channelActive(ctx);}/*** 断开的时候减一** @param ctx ChannelHandlerContext*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.debug("有连接断开:{}", ctx.channel().id().asLongText());super.channelInactive(ctx);} }
测试
package com.example.netty.controller;import cn.hutool.core.util.ByteUtil;
import com.example.netty.common.client.NettyClient;
import com.example.netty.common.utils.ByteUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;/*** <p></p>** @author xin* @version 2023/11/1 17:02**/
@RestController
public class TestClientController {private static final Logger logger = LoggerFactory.getLogger(TestClientController.class);private final NettyClient nettyClient;public TestClientController(NettyClient nettyClient) {this.nettyClient = nettyClient;}public static void main(String[] args) {System.out.println("ABCDE一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;".getBytes(StandardCharsets.UTF_8).length);}@GetMapping("/app/netty/normal/byteBuf")public String testNormalByteBuf() throws InterruptedException {logger.debug("发送正常情况下的 normal byte buf 模式");ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);for (int i = 0; i < 10; i++) {String msg = "一岁一枯荣,野火烧不尽;";ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();buffer.writeBytes(msg.getBytes(StandardCharsets.UTF_8));// 此处采用byte模式channelFuture.channel().writeAndFlush(buffer);Thread.sleep(1000);}nettyClient.disconnect(channelFuture.channel());return "normal byte buf 发送成功";}@GetMapping("/app/netty/normal/byte")public String testNormalByte() throws InterruptedException {logger.debug("发送正常情况下的 normal byte 模式");ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);for (int i = 0; i < 10; i++) {String msg = "一岁一枯荣,野火烧不尽;";channelFuture.channel().writeAndFlush(msg.getBytes(StandardCharsets.UTF_8));Thread.sleep(1000);}nettyClient.disconnect(channelFuture.channel());return "normal byte 发送成功";}@GetMapping("/app/netty/normal/string")public String testNormalString() throws InterruptedException {logger.debug("发送正常情况下的 string 模式");ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);for (int i = 0; i < 10; i++) {String msg = "一岁一枯荣,野火烧不尽;";channelFuture.channel().writeAndFlush(msg);Thread.sleep(1000);}nettyClient.disconnect(channelFuture.channel());return "string 发送成功";}@GetMapping("/app/netty/normal/nbcb")public String testNb() throws InterruptedException {logger.debug("验证粘包、拆包现象,客户端给服务端发送产生拆包、服务端给客户端发送产生 粘包");ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);StringBuilder msg = new StringBuilder("一岁一枯荣,野火烧不尽;");for (int i = 0; i < 10; i++) {msg.append(msg);}msg.append("end");ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();buffer.writeBytes(msg.toString().getBytes(StandardCharsets.UTF_8));channelFuture.channel().writeAndFlush(buffer);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "string 发送成功";}@GetMapping("/app/netty/fixLength")public String testFixLength() throws InterruptedException {ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);// 此处是字节长度为 64,如果不够 64 则本次发送的消息收不到会暂存到缓存中,等凑够 64 才会监听到消息// 固定长度,如果长度不够且需要收到消息,通过补空格实现// 固定长度,如果 超过 64则只会 收 64 长度的数据,剩余的不够64 的暂存到缓存中,等凑够64 才会监听到消息// 建议 发送的消息为 设置长度 64 的倍数,否则会将字节截断产生乱码String s = "一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";}@GetMapping("/app/netty/lineBase")public String testLineBase() throws InterruptedException {ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);String s = "一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;" +"一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;\r\n";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";}@GetMapping("/app/netty/delimiter")public String testDelimiter() throws InterruptedException {ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);String s = "一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;" +"一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;78B987";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";}@GetMapping("/app/netty/lengthField")public String testLengthField() throws InterruptedException {ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);for (int i = 0; i < 150; i++) {//长度 64byte[] sendMsgBytes = ("一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;[" + i + "]").getBytes(StandardCharsets.UTF_8);//长度 5byte[] headerBytes = "ABCDE".getBytes(StandardCharsets.UTF_8);int bodyLength = sendMsgBytes.length + headerBytes.length;byte[] bytes = {};//长度 5bytes = ByteUtils.append(bytes, headerBytes);//长度 4bytes = ByteUtils.append(bytes, ByteUtil.intToBytes(bodyLength));//长度 64bytes = ByteUtils.append(bytes, sendMsgBytes);channelFuture.channel().writeAndFlush(bytes);Thread.sleep(1000);}nettyClient.disconnect(channelFuture.channel());return "发送成功";}
}
###
GET http://localhost:8080/app/netty/normal/byteBuf### 2 以字节发送、以字节接收,必须加 byte 解码器和编码器
GET http://localhost:8080/app/netty/normal/byte### 3 以字符串发送、以字符串接收,必须加 string 解码器和编码器
GET http://localhost:8080/app/netty/normal/string### 4 验证 粘包、拆包
GET http://localhost:8080/app/netty/normal/nbcb### 5 验证 粘包、拆包 固定长度解决方案
GET http://localhost:8080/app/netty/fixLength### 6 验证 粘包、拆包 通过在包尾添加回车换行符 \r\n 来区分整包消息
GET http://localhost:8080/app/netty/lineBase0### 7
GET http://localhost:8080/app/netty/lineBase1### 8
GET http://localhost:8080/app/netty/lineBase2### 9 验证 粘包、拆包 特殊字符作为分隔符来区分整包消息;
GET http://localhost:8080/app/netty/delimiter0### 10 验证 粘包、拆包 特殊字符作为分隔符来区分整包消息;
GET http://localhost:8080/app/netty/delimiter1### 11 验证 粘包、拆包 特殊字符作为分隔符来区分整包消息;
GET http://localhost:8080/app/netty/delimiter2### 12 验证 粘包、拆包 固定长度解决方案
GET http://localhost:8080/app/netty/fixLength0### 13 验证 粘包、拆包 固定长度解决方案
GET http://localhost:8080/app/netty/fixLength1### 14 验证 粘包、拆包 固定长度解决方案
GET http://localhost:8080/app/netty/fixLength2### 15 验证 粘包、拆包 特殊字符作为分隔符来区分整包消息;
GET http://localhost:8080/app/netty/lengthField0
启动
package com.example.netty;import com.example.netty.common.client.NettyClient;
import com.example.netty.common.server.NettyServer;
import com.example.netty.common.utils.NettyConstants;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.EnableAsync;import javax.annotation.Resource;@SpringBootApplication
@EnableAsync
public class NettyDemoApplication {@Resourceprivate NettyServer nettyServer;@Resourceprivate NettyClient nettyClient;public static void main(String[] args) {SpringApplication.run(NettyDemoApplication.class, args);System.out.println("+++++服务启动成功+++++");}@EventListener(ApplicationStartedEvent.class)public void init() {// 测试 normal byteBuf 包String type = NettyConstants.NORMAL_BYTE_BUF;// 测试 byte 包
// String type = NettyConstants.NORMAL_BYTE;// 测试String 包
// String type = NettyConstants.NORMAL_STRING;// 演示 拆包和粘包
// String type = NettyConstants.NBCB;// 粘包和拆包固定长度解决方案
// String type = NettyConstants.FIX_LENGTH;// 拆包、粘包 结尾\r\n 来区分整包解决方案
// String type = NettyConstants.LINE_BASE;// 拆包、粘包 特殊字符结尾来区分整包解决方案
// String type = NettyConstants.DELIMITER;// 头部设置 整包的长度来进行整包区分,每个包的长度放在头部
// String type = NettyConstants.LENGTH_FIELD;nettyServer.init("127.0.0.1", 9000, type);nettyClient.init(type);}
}
ByteBuf
netty最基本的用法,不设置decoder、encoder等,默认采用byteBuf 的方式进行发送和接收;
注意:如果发送方的编码格式和接收方的编码格式不一致,会导致接收不到消息,且不会报错
客户端
初始化handler 和Decoder、Encoder的配置,采用的是责任链模式
package com.example.netty.normal.bytebuf.client;import com.example.netty.common.client.ClientHeartBeatServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:06**/
@SuppressWarnings("all")
public class NormalByteBufClientInitializer extends ChannelInitializer<SocketChannel> {private static final String DECODER = "decoder";private static final String ENCODER = "encoder";/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ClientHeartBeatServerHandler()).addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS)).addLast(new NormalByteBufClientHandler());}
}
服务端
package com.example.netty.normal.bytebuf.server;import com.example.netty.common.server.ConnectionCountHandler;
import com.example.netty.common.server.ServerHeartBeatServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:04**/
@SuppressWarnings("all")
public class NormalByteBufServerInitializer extends ChannelInitializer<SocketChannel> {private static final String ENCODER = "encoder";private static final String DECODER = "decoder";/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS)).addLast(new NormalByteBufServerHandler()).addLast(new ConnectionCountHandler()).addLast(new ServerHeartBeatServerHandler());}
}
测试1
@GetMapping("/app/netty/normal/byteBuf")
public String testNormalByteBuf() throws InterruptedException {logger.debug("发送正常情况下的 normal byte buf 模式");ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);for (int i = 0; i < 10; i++) {String msg = "一岁一枯荣,野火烧不尽;";// 此处采用的是 byte形式进行发送消息,但是 initChannel没有配置 decoder 和encoder的转译,因此服务端收不到消息channelFuture.channel().writeAndFlush(msg.getBytes(StandardCharsets.UTF_8));Thread.sleep(1000);}nettyClient.disconnect(channelFuture.channel());return "normal byte buf 发送成功";
}
输出
只有创建连接的日志没有收到客户端消息的日志
测试2
@GetMapping("/app/netty/normal/byteBuf")
public String testNormalByteBuf() throws InterruptedException {logger.debug("发送正常情况下的 normal byte buf 模式");ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);for (int i = 0; i < 10; i++) {String msg = "一岁一枯荣,野火烧不尽;";ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();buffer.writeBytes(msg.getBytes(StandardCharsets.UTF_8));// 此处采用byte模式channelFuture.channel().writeAndFlush(buffer);Thread.sleep(1000);}nettyClient.disconnect(channelFuture.channel());return "normal byte buf 发送成功";
}
输出
Byte 模式
客户端
-
客户端配置
package com.example.netty.normal.bytes.client;import com.example.netty.common.client.ClientHeartBeatServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:06**/ @SuppressWarnings("all") public class NormalByteClientInitializer extends ChannelInitializer<SocketChannel> {/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;private static final String DECODER = "decoder";private static final String ENCODER = "encoder";@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ClientHeartBeatServerHandler())// 心跳监听配置.addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS))// 客户端向服务端发送消息时 采用 byte 编码方式,对应 NormalByteServerInitializer 中的decoder// TestClientController.testNormalByte 中发送时需要 以 字节方式进行发送.addLast(ENCODER, new ByteArrayEncoder())// 服务端向客户端发送消息,采用byte 编码方式,对应NormalByteServerInitializer 中的encoder// 需要对应 NormalByteServerHandler.channelRead ctx.channel().writeAndFlush(serverMsg.getBytes(StandardCharsets.UTF_8));.addLast(DECODER, new ByteArrayDecoder())// 用于监听消息、处理消息,如果 DECODER 设置的是字节,则channelRead 收到的msg 是字节.addLast(new NormalByteClientHandler());} }
-
客户端handler
package com.example.netty.normal.bytes.client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;/*** <p></p>** @author xin* @version 2023/11/2 16:41**/
public class NormalByteClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(NormalByteClientHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.info("接收到服务端的响应:{} ", ctx.channel().id().asLongText());String body = new String((byte[]) msg, StandardCharsets.UTF_8);System.out.println("-----client start------| " + body + " | ------client end------");}
}
服务端
-
服务端配置
package com.example.netty.normal.bytes.server;import com.example.netty.common.server.ConnectionCountHandler; import com.example.netty.common.server.ServerHeartBeatServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:04**/ @SuppressWarnings("all") public class NormalByteServerInitializer extends ChannelInitializer<SocketChannel> {private static final String ENCODER = "encoder";private static final String DECODER = "decoder";/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS))// 服务端向客户端发送消息,采用byte 编码方式,对应 NormalByteClientInitializer 中的decoder // 需要对应 NormalByteServerHandler.channelRead ctx.channel().writeAndFlush(serverMsg.getBytes(StandardCharsets.UTF_8));.addLast(ENCODER, new ByteArrayEncoder())// 客户端向服务端发送消息时 采用 byte 编码方式,对应 NormalByteClientInitializer 中的 encoder// TestClientController.testNormalByte 中发送时需要 以 字节方式进行发送.addLast(DECODER, new ByteArrayDecoder())// 监听消息,并接收消息,此处配置的是 字节,因此收到的消息 是字节,强转位 字节.addLast(new NormalByteServerHandler())// 监听 客户端连接.addLast(new ConnectionCountHandler())// 监听客户端心跳.addLast(new ServerHeartBeatServerHandler());} }
-
服务端handler
package com.example.netty.normal.bytes.server;import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;/*** <p></p>** @author xin* @version 2023/11/2 16:39**/
public class NormalByteServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(NormalByteServerHandler.class);@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.debug("收到客户端消息");try {// 因为TestClientController.testNormalByte 中发送时以字节发送// 对应NormalByteServerInitializer 的decoder ByteArrayDecoder 此类将 byteBuf 处理为了 bytes// NormalByteClientInitializer encoder // 因此此处可以强转为 byte String body = new String((byte[]) msg, StandardCharsets.UTF_8);System.out.println("-----server start------| " + body + " | ------server end------");//发送给客户端String serverMsg = "++++++++我是服务端给客户端的消息+++++++";// 此处需要对应 NormalByteServerInitializer 中的encoder 模式// 以及 NormalByteClientInitializer 中的decoder 模式,如果不匹配则客户端收不到消息// 因为 encoder 与 decoder 配置的是 ByteArrayDecoder、ByteArrayEncoder 因此此处只能发送字节ctx.channel().writeAndFlush(serverMsg.getBytes(StandardCharsets.UTF_8));} catch (Exception e) {logger.error("接收数据异常", e);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
测试
@GetMapping("/app/netty/normal/byte")
public String testNormalByte() throws InterruptedException {logger.debug("发送正常情况下的 normal byte 模式");ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);for (int i = 0; i < 10; i++) {String msg = "一岁一枯荣,野火烧不尽;";// 此处发送 字节还是字符串 需要对应 NormalByteClientInitializer 里面的encoder 配置// 以及NormalByteServerInitializer decoder 设置,如果不是 字节方式,则服务端收不到消息channelFuture.channel().writeAndFlush(msg.getBytes(StandardCharsets.UTF_8));Thread.sleep(1000);}nettyClient.disconnect(channelFuture.channel());return "normal byte 发送成功";
}
执行 TestClientController.http
### 2 以字节发送、以字节接收,必须加 byte 解码器和编码器
GET http://localhost:8080/app/netty/normal/byte
输出
String 模式
客户端发送的方式以及 initChannel 中的encoder 必须与 服务端的initChannel的decoder 以及接收方式匹配,如果匹配不上则有可能收不到消息;
客户端
-
客户端配置
package com.example.netty.normal.string.client;import com.example.netty.common.client.ClientHeartBeatServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:06**/ @SuppressWarnings("all") public class NormalStringClientInitializer extends ChannelInitializer<SocketChannel> {/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;private static final String DECODER = "decoder";private static final String ENCODER = "encoder";@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 心跳监听配置ch.pipeline().addLast(new ClientHeartBeatServerHandler())// 心跳监听配置.addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS))// 客户端向服务端发送消息时 采用 字符串 编码方式,对应 NormalStringServerInitializer 中的decoder// TestClientController.testNormalString 中发送时需要 以字符串方式进行发送// NormalStringServerHandler.channelRead 以字符串方式进行接收.addLast(ENCODER, new StringEncoder())// 服务端向客户端发送消息,采用字符串编码方式,对应 NormalStringServerInitializer 中的encoder// 需要对应 NormalStringServerHandler.channelRead ctx.channel().writeAndFlush(serverMsg);// NormalStringClientHandler.channelRead 通过字符串接收.addLast(DECODER, new StringDecoder())// 用于监听消息、处理消息,如果 DECODER 设置的是字节,则channelRead 收到的msg 是字符串.addLast(new NormalStringClientHandler());} }
-
客户端Handler
package com.example.netty.normal.string.client;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;/*** <p></p>** @author xin* @version 2023/11/2 16:41**/ public class NormalStringClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(NormalStringClientHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.info("接收到服务端的响应:{} ", ctx.channel().id().asLongText());// 服务端向客户端发送消息,采用字符串编码方式,对应 NormalStringServerInitializer 中的encoder// NormalStringClientInitializer 的decoder StringDecoder 将字节转 字符串处理// 因此此处采用 字符串接收,String body = (String) msg;System.out.println("-----client start------| " + body + " | ------client end------");} }
服务端
-
服务端配置
package com.example.netty.normal.string.server;import com.example.netty.common.server.ConnectionCountHandler; import com.example.netty.common.server.ServerHeartBeatServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:04**/ @SuppressWarnings("all") public class NormalStringServerInitializer extends ChannelInitializer<SocketChannel> {private static final String ENCODER = "encoder";private static final String DECODER = "decoder";/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 心跳ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS))// 服务端向客户端发送消息,采用 字符串 编码方式,对应 NormalStringClientInitializer 中的decoder// 需要对应 NormalStringServerHandler.channelRead ctx.channel().writeAndFlush(serverMsg);// NormalStringClientHandler.channelRead 通过字符串接收.addLast(ENCODER, new StringEncoder())// 客户端向服务端发送消息时 采用 字符串 编码方式,对应 NormalStringClientInitializer 中的 encoder// TestClientController.testNormalString 中发送时需要 以 字符串 方式进行发送// NormalStringServerHandler.channelRead 以字符串方式进行接收.addLast(DECODER, new StringDecoder())// 监听消息,并接收消息,此处配置的是 字节,因此收到的消息 是字节,强转位 字节.addLast(new NormalStringServerHandler())// 监听 客户端连接.addLast(new ConnectionCountHandler())// 监听客户端心跳.addLast(new ServerHeartBeatServerHandler());} }
-
服务端Handler
package com.example.netty.normal.string.server;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;/*** <p></p>** @author xin* @version 2023/11/2 16:39**/ public class NormalStringServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(NormalStringServerHandler.class);@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.debug("收到客户端消息");try { // NormalStringClientInitializer 中的 encoder // NormalStringServerInitializer 中的 decoder StringDecoder 将字节转成了 字符串,可看源码 //String body = (String) msg;System.out.println("-----server start------| " + body + " | ------server end------");//发送给客户端 // 服务端向客户端发送消息,采用 字符串 编码方式,对应 NormalStringServerInitializer 中的 encoder // NormalStringClientInitializer 中的decoderString serverMsg = "++++++++我是服务端给客户端的消息+++++++";ctx.channel().writeAndFlush(serverMsg);} catch (Exception e) {logger.error("接收数据异常", e);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();} }
测试
@GetMapping("/app/netty/normal/string")
public String testNormalString() throws InterruptedException {logger.debug("发送正常情况下的 string 模式");ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);for (int i = 0; i < 10; i++) {String msg = "一岁一枯荣,野火烧不尽;";channelFuture.channel().writeAndFlush(msg);Thread.sleep(1000);}nettyClient.disconnect(channelFuture.channel());return "string 发送成功";
}
执行 TestClientController.http
### 3 以字符串发送、以字符串接收,必须加 string 解码器和编码器
GET http://localhost:8080/app/netty/normal/string
拆包、粘包复现
拆包、粘包测试客户端采用ByteBuf模式进行发送以及接收、服务端采用ByteBuf模式进行接收以及给客户端发送;
客户端
-
客户端配置
采用默认的byteBuf 模式,及没有配置任何编码器和解码器,默认采用byteBuf形式进行发送和接收
package com.example.netty.nbcb.client;import com.example.netty.common.client.ClientHeartBeatServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:06**/
@SuppressWarnings("all")
public class NormalNBCBClientInitializer extends ChannelInitializer<SocketChannel> {private static final String DECODER = "decoder";private static final String ENCODER = "encoder";/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ClientHeartBeatServerHandler()).addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS)).addLast(new NormalNBCBClientHandler());}
}
-
客户端Handler
package com.example.netty.nbcb.client;import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;/*** <p></p>** @author xin* @version 2023/11/2 16:41**/ public class NormalNBCBClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(NormalNBCBClientHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.info("接收到服务端的响应");ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, StandardCharsets.UTF_8);System.out.println("-----client start------|\n " + body + " \n| ------client end------");} }
服务端
-
服务端配置
package com.example.netty.nbcb.server;import com.example.netty.common.server.ConnectionCountHandler; import com.example.netty.common.server.ServerHeartBeatServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:04**/ @SuppressWarnings("all") public class NormalNBCBServerInitializer extends ChannelInitializer<SocketChannel> {private static final String ENCODER = "encoder";private static final String DECODER = "decoder";/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS)).addLast(new NormalNBCBServerHandler()).addLast(new ConnectionCountHandler()).addLast(new ServerHeartBeatServerHandler());} }
-
服务端handler
package com.example.netty.nbcb.server;import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;/*** <p></p>** @author xin* @version 2023/11/2 16:39**/ public class NormalNBCBServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(NormalNBCBServerHandler.class);@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.debug("收到客户端消息");try {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, StandardCharsets.UTF_8);System.out.println("-----start------|\n " + body + " \n | ------end------");//发送给客户端for (int n = 0; n < 5; n++) {String serverMsg = "++++++++我是服务端给客户端的消息 【" + n + "】+++++++\n";// 初始化 容量,默认是1024ByteBuf serverBuffer = ctx.alloc().buffer(16);serverBuffer.writeBytes(serverMsg.getBytes(StandardCharsets.UTF_8));// 此处演示粘包现象,连续发送了多次,客户端将多次请求合并接收,可以看接收的序号ctx.channel().writeAndFlush(serverBuffer);}} catch (Exception e) {logger.error("接收数据异常", e);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();} }
测试
执行 TestClientController.http
### 4 验证 粘包、拆包
GET http://localhost:8080/app/netty/normal/nbcb
@GetMapping("/app/netty/normal/nbcb")
public String testNb() throws InterruptedException {logger.debug("验证粘包、拆包现象,客户端给服务端发送产生拆包、服务端给客户端发送产生 粘包");ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);StringBuilder msg = new StringBuilder("一岁一枯荣,野火烧不尽;");for (int i = 0; i < 10; i++) {msg.append(msg);}msg.append("end");ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();buffer.writeBytes(msg.toString().getBytes(StandardCharsets.UTF_8));// 演示拆包现象,此处 只发送了一次,客户端接收了多次channelFuture.channel().writeAndFlush(buffer);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "string 发送成功";
}
输出
解决拆包粘包方案
处理 TCP 粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要。
为了解决网络数据流的拆包粘包问题,Netty 为我们内置了如下的解码器:
- ByteToMessageDecoder:如果想实现自己的半包解码器,实现该类;
- MessageToMessageDecoder:一般作为二次解码器,当我们在 ByteToMessageDecoder 将一个 bytes 数组转换成一个 java 对象的时候,我们可能还需要将这个对象进行二次解码成其他对象,我们就可以继承这个类;
- LineBasedFrameDecoder:通过在包尾添加回车换行符 \r\n 来区分整包消息;
- StringDecoder:字符串解码器;
- DelimiterBasedFrameDecoder:特殊字符作为分隔符来区分整包消息;
- FixedLengthFrameDecoder:报文大小固定长度,不够空格补全;
- ProtoBufVarint32FrameDecoder:通过 Protobuf 解码器来区分整包消息;
- ProtobufDecoder: Protobuf 解码器;
- LengthFieldBasedFrameDecoder:指定长度来标识整包消息,通过在包头指定整包长度来约定包长。
Netty 还内置了如下的编码器:
- ProtobufEncoder:Protobuf 编码器;
- MessageToByteEncoder:将 Java 对象编码成 ByteBuf;
- MessageToMessageEncoder:如果不想将 Java 对象编码成 ByteBuf,而是自定义类就继承这个;
- LengthFieldPrepender:LengthFieldPrepender 是一个非常实用的工具类,如果我们在发送消息的时候采用的是:消息长度字段+原始消息的形式,那么我们就可以使用 LengthFieldPrepender。这是因为 LengthFieldPrepender 可以将待发送消息的长度(二进制字节长度)写到 ByteBuf 的前两个字节。
以下通过代码来具体说明如何用,测试采用String编码、解码器:
LineBased
LineBasedFrameDecoder
参数说明:
maxLength
一包的最大长度,如果包长大于这个最大长度则会抛出异常;stripDelimiter
解码后的消息是否去除分隔符; 此处是 通过 \n\r来进行区分,因此接收到消息后建议去掉分割符;failFast
为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常;
客户端
-
客户端配置
package com.example.netty.linebase.client;import com.example.netty.common.client.ClientHeartBeatServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:06**/ @SuppressWarnings("all") public class LineBaseClientInitializer extends ChannelInitializer<SocketChannel> {private static final String DECODER = "decoder";private static final String ENCODER = "encoder";/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ClientHeartBeatServerHandler()).addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS))// 优先判断最大长度// stripDelimiter:解码后的消息是否去除分隔符 此处是 通过 \n\r来进行区分,因此接收到消息后需要去掉分割符// failFast = false, 那么会等到解码出完整消息才会抛出 TooLongException。 // Integer.MAX_VALUE 一包的最大长度,如果包长大于这个最大长度则会抛出异常.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE, true, false)).addLast(ENCODER, new StringEncoder()).addLast(DECODER, new StringDecoder()).addLast(new LineBaseClientHandler());} }
-
客户端Handler
package com.example.netty.linebase.client;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** <p></p>** @author xin* @version 2023/11/2 16:41**/ public class LineBaseClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(LineBaseClientHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.info("接收到服务端的响应");String body = (String) msg;System.out.println("-----client start------| \n " + body + " \n| ------client end------");} }
服务端
-
服务端配置
package com.example.netty.linebase.server;import com.example.netty.common.server.ConnectionCountHandler; import com.example.netty.common.server.ServerHeartBeatServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:04**/ @SuppressWarnings("all") public class LineBaseServerInitializer extends ChannelInitializer<SocketChannel> {private static final String ENCODER = "encoder";private static final String DECODER = "decoder";/*** 如果failFast=true,当超过maxLength后会立刻抛出TooLongFrameException,不再进行解码;* 如果failFast=false,那么会等到解码出一个完整的消息后才会抛出TooLongFrameException*/private final boolean failFast = false;/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS))// 优先判断最大长度// stripDelimiter:解码后的消息是否去除分隔符,此处是 通过 \n\r来进行区分,因此接收到消息后需要去掉分割符// failFast = false, 那么会等到解码出完整消息才会抛出 TooLongException。.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE, true, failFast)).addLast(ENCODER, new StringEncoder()).addLast(DECODER, new StringDecoder()).addLast(new ConnectionCountHandler()).addLast(new LineBaseServerHandler()).addLast(new ServerHeartBeatServerHandler());} }
-
服务端Handler
package com.example.netty.linebase.server;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;/*** <p></p>** @author xin* @version 2023/11/2 16:39**/ public class LineBaseServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(LineBaseServerHandler.class);@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.debug("收到客户端消息");try {String body = (String) msg;System.out.println("-----client start------| \n " + body + " \n| ------client end------");// 如果没有分割符 收不到消息;ctx.channel().writeAndFlush("我是服务端给客户端的消息;\r\n我是服务端给客户端的消息;\r\n");} catch (Exception e) {logger.error("接收数据异常", e);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}public static void main(String[] args) {System.out.println("我是服务端给客户端的消息;".getBytes(StandardCharsets.UTF_8).length);} }
测试0
执行 TestClientController.http
### 6 验证 粘包、拆包 通过在包尾添加回车换行符 \r\n 来区分整包消息
GET http://localhost:8080/app/netty/lineBase0
@GetMapping("/app/netty/lineBase0")
public String testLineBase0() throws InterruptedException {ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);// 服务端收不到条消息, 因为没有监听到分割符,因此服务端收不到消息String s = "一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;" +"一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";
}
输出
测试1
执行 TestClientController.http
###
GET http://localhost:8080/app/netty/lineBase1
@GetMapping("/app/netty/lineBase1")
public String testLineBase1() throws InterruptedException {ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);// 服务端收到一条消息String s = "发送一次,收到一次;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;" +"一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;\r\n";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";
}
输出
测试2
执行 TestClientController.http
###
GET http://localhost:8080/app/netty/lineBase2
@GetMapping("/app/netty/lineBase2")
public String testLineBase2() throws InterruptedException {ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);// 此处因为 有两个 分割符号 \r\n 因此服务端收到两条消息String s = "发送一次,收到两次;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;\r\n" +"一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;\r\n";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";
}
输出
DelimiterBased
此解码器是LineBasedFrameDecoder
升级版。可以自定义进行整包分割的字符;
需要注意的是 用来进行判断整包结尾的分割符务必不要和包体数据重复,否则容易将数据进行分割;
DelimiterBasedFrameDecoder
参数说明:
maxLength
一包的最大长度,如果包长大于这个最大长度则会抛出异常;stripDelimiter
解码后的消息是否去除分隔符;failFast
为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常;delimiter
自定义分割符,是byteBuf 类型的。
客户端
-
客户端配置
package com.example.netty.delimiter.client;import com.example.netty.common.client.ClientHeartBeatServerHandler; import com.example.netty.common.utils.NettyConstants; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler;import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:06**/ @SuppressWarnings("all") public class DelimiterClientInitializer extends ChannelInitializer<SocketChannel> {private static final String DECODER = "decoder";private static final String ENCODER = "encoder";private static final String PING = "ping";/*** 如果failFast=true,当超过maxLength后会立刻抛出TooLongFrameException,不再进行解码;* 如果failFast=false,那么会等到解码出一个完整的消息后才会抛出TooLongFrameException*/private final boolean failFast = false;/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ClientHeartBeatServerHandler()).addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS))// 优先判断最大长度// stripDelimiter:解码后的消息是否去除分隔符 不去掉分割符// failFast = false, 那么会等到解码出完整消息才会抛出 TooLongException。// 此处采用 ; 进行分割 服务端给客户端发送消息,客户端收到消息根据 ; 字符判断是否为一包数据.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, false, false,Unpooled.copiedBuffer(NettyConstants.DELIMITER_SPLIT_CLIENT.getBytes(StandardCharsets.UTF_8)))).addLast(ENCODER, new StringEncoder()).addLast(DECODER, new StringDecoder()).addLast(new DelimiterClientHandler());} }
-
客户端Handler
package com.example.netty.delimiter.client;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** <p></p>** @author xin* @version 2023/11/2 16:41**/ public class DelimiterClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(DelimiterClientHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.info("接收到服务端的响应");String body = (String) msg;System.out.println("-----client start------| \n " + body + " \n| ------client end------");} }
服务端
-
服务端配置
package com.example.netty.delimiter.server;import com.example.netty.common.server.ConnectionCountHandler; import com.example.netty.common.server.ServerHeartBeatServerHandler; import com.example.netty.common.utils.NettyConstants; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler;import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:04**/ @SuppressWarnings("all") public class DelimiterServerInitializer extends ChannelInitializer<SocketChannel> {private static final String ENCODER = "encoder";private static final String DECODER = "decoder";/*** 如果failFast=true,当超过maxLength后会立刻抛出TooLongFrameException,不再进行解码;* 如果failFast=false,那么会等到解码出一个完整的消息后才会抛出TooLongFrameException*/private final boolean failFast = false;/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS))// 优先判断最大长度// stripDelimiter:解码后的消息是否去除分隔符// failFast = false, 那么会等到解码出完整消息才会抛出 TooLongException。// 此处通过 78B987 来进行分割是否为一包数据,服务端收到客户端消息;需要注意的是 用来进行判断整包结尾的分割符 务必不要和包体 数据重复.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, true, failFast,Unpooled.copiedBuffer(NettyConstants.DELIMITER_SPLIT_SERVER.getBytes(StandardCharsets.UTF_8)))).addLast(ENCODER, new StringEncoder()).addLast(DECODER, new StringDecoder()).addLast(new ConnectionCountHandler()).addLast(new DelimiterServerHandler()).addLast(new ServerHeartBeatServerHandler());} }
-
服务端Handler
package com.example.netty.delimiter.server;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;/*** <p></p>** @author xin* @version 2023/11/2 16:39**/ public class DelimiterServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(DelimiterServerHandler.class);@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.debug("收到客户端消息");try {String body = (String) msg;System.out.println("-----client start------| \n " + body + " \n| ------client end------");// 如果没有分割符 收不到消息;ctx.channel().writeAndFlush("我是服务端给客户端的消息;我是服务端给客户端的消息;");} catch (Exception e) {logger.error("接收数据异常", e);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}public static void main(String[] args) {System.out.println("我是服务端给客户端的消息;".getBytes(StandardCharsets.UTF_8).length);} }
测试0
执行 TestClientController.http
#### 9 验证 粘包、拆包 特殊字符作为分隔符来区分整包消息;
GET http://localhost:8080/app/netty/delimiter0
客户端给服务端发送消息,没有分割符,服务端收不到消息,直到遇到分割符,服务端才监听到消息;
@GetMapping("/app/netty/delimiter0")
public String testDelimiter0() throws InterruptedException {ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);// 没有分割符,服务端收不到消息String s = "一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;" +"一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;没有分割符";//本次发送没有分割符channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);// 在发送一次channelFuture.channel().writeAndFlush("重新发送一次,发送分割符78B987");// 此处休眠 1 s 断开连接是为了收 服务端消息Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";
}
服务端输出,服务端通过 78B987
分割符来进行区分整包消息;
客户端输出
测试1
执行 TestClientController.http
### 10 验证 粘包、拆包 特殊字符作为分隔符来区分整包消息;
GET http://localhost:8080/app/netty/delimiter1
@GetMapping("/app/netty/delimiter1")
public String testDelimiter1() throws InterruptedException {// 有一个分割符,服务端收到一条消息ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);String s = "一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;" +"一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;78B987";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";
}
输出,服务端输出一条数据,因为客户端发送的时候只有一个分割符,且收到的消息删除分割符
测试2
执行 TestClientController.http
### 11 验证 粘包、拆包 特殊字符作为分隔符来区分整包消息;
GET http://localhost:8080/app/netty/delimiter2
@GetMapping("/app/netty/delimiter2")
public String testDelimiter2() throws InterruptedException {// 有两个分割符,服务端收到两条消息ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);String s = "一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;78B987" +"一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;78B987";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";
}
输出,因为是两个分割符 78B987
,客户端发送了一次,服务端接收了两次
FixedLength
FixedLengthFrameDecoder
固定长度,如果长度不够,则服务端收不到消息,如果需要能够收到消息,则需要通过手动补空格,将长度补齐;
如果一次发送的长度超过设置的长度,则需要是设置长度的倍数,否则,收到的数据会产生乱码;
一般使用此策略的场景是 发送消息的长度是固定的,如果不固定,不建议使用。
客户端
-
客户端配置
package com.example.netty.fixlength.client;import com.example.netty.common.client.ClientHeartBeatServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:06**/ @SuppressWarnings("all") public class FixLengthClientInitializer extends ChannelInitializer<SocketChannel> {private static final String DECODER = "decoder";private static final String ENCODER = "encoder";private static final String PING = "ping";/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ClientHeartBeatServerHandler()).addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS))// 固定 长度, 每一包消息长度必须是 37 字节,只有达到 37 时才会被接收 // 所有的消息,必须是 37 的倍数,不然会截断产生乱码 // FixLengthServerHandler channelRead 给客户端发送消息.addLast(new FixedLengthFrameDecoder(37)).addLast(ENCODER, new StringEncoder()).addLast(DECODER, new StringDecoder()).addLast(new FixLengthClientHandler());} }
-
客户端Handler
package com.example.netty.fixlength.client;import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;/*** <p></p>** @author xin* @version 2023/11/2 16:41**/ public class FixLengthClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(FixLengthClientHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.info("接收到服务端的响应");String body = (String) msg;System.out.println("-----client start------| \n " + body + " \n| ------client end------");} }
服务端
-
服务端配置
package com.example.netty.fixlength.server;import com.example.netty.common.server.ConnectionCountHandler; import com.example.netty.common.server.ServerHeartBeatServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:04**/ @SuppressWarnings("all") public class FixLengthServerInitializer extends ChannelInitializer<SocketChannel> {private static final String ENCODER = "encoder";private static final String DECODER = "decoder";/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS))// 固定 长度, 每一包消息长度必须是 64 字节,只有达到 64 时才会被接收 // 所有的消息,必须是 64 的倍数,不然会截断产生乱码, // 发送的时候 TestClientController.testFixLength0 需要发送 32 字节.addLast(new FixedLengthFrameDecoder(32)).addLast(ENCODER, new StringEncoder()).addLast(DECODER, new StringDecoder()).addLast(new ConnectionCountHandler()).addLast(new FixLengthServerHandler()).addLast(new ServerHeartBeatServerHandler());} }
-
服务端Handler
package com.example.netty.fixlength.server;import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.nio.charset.StandardCharsets;/*** <p></p>** @author xin* @version 2023/11/2 16:39**/ public class FixLengthServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(FixLengthServerHandler.class);@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.debug("收到客户端消息");try {String body = (String) msg;System.out.println("-----server start------| \n " + body + " \n| ------server end------");// 此处 我是服务端给客户端的消息; 字节长度是 37; FixLengthClientInitializer FixedLengthFrameDecoder 配置的是 37,因此客户端收到 两条数据ctx.channel().writeAndFlush("我是服务端给客户端的消息;我是服务端给客户端的消息;");} catch (Exception e) {logger.error("接收数据异常", e);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}public static void main(String[] args) {System.out.println("我是服务端给客户端的消息;".getBytes(StandardCharsets.UTF_8).length);} }
测试0
执行 TestClientController.http
### 12 验证 粘包、拆包 固定长度解决方案
GET http://localhost:8080/app/netty/fixLength0
@GetMapping("/app/netty/fixLength0")
public String testFixLength0() throws InterruptedException {ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);// 此处是字节长度为 32,如果不够 32 则本次发送的消息收不到会暂存到缓存中,等凑够 32 才会监听到消息// 固定长度,如果长度不够且需要收到消息,通过补空格实现// 固定长度,如果 超过 32 则只会 收 32 长度的数据,剩余的不够 32 的暂存到缓存中,等凑够 32 才会监听到消息// 建议 发送的消息为 设置长度 32 的倍数,否则会将字节截断产生乱码// 一个汉字 3 个字节,一个, ;一个字节String s = "一岁一枯荣,野火烧不尽;";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";
}
服务端输出
客户端输出
测试1
执行 TestClientController.http
### 13 验证 粘包、拆包 固定长度解决方案
GET http://localhost:8080/app/netty/fixLength1
@GetMapping("/app/netty/fixLength1")
public String testFixLength1() throws InterruptedException {ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);// 此处是字节长度为 32,如果不够 32 则本次发送的消息收不到会暂存到缓存中,等凑够 32 才会监听到消息// 固定长度,如果长度不够且需要收到消息,通过补空格实现// 固定长度,如果 超过 32 则只会 收 32 长度的数据,剩余的不够 32 的暂存到缓存中,等凑够 32 才会监听到消息// 建议 发送的消息为 设置长度 32 的倍数,否则会将字节截断产生乱码// 一个汉字 3 个字节,一个, ;一个字节// 服务端 只能收到 一岁一枯荣,野火烧不尽;// 一岁一枯 需要等下一次发送的时候 凑够 32 个字节才会收到String s = "一岁一枯荣,野火烧不尽;一岁一枯";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);s = "一岁一枯荣,野火烧不尽;一岁一枯";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";
}
服务端输出
测试2
执行 TestClientController.http
### 14 验证 粘包、拆包 固定长度解决方案
GET http://localhost:8080/app/netty/fixLength2
@GetMapping("/app/netty/fixLength2")
public String testFixLength2() throws InterruptedException {ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);// 此处是字节长度为 32,如果不够 32 则本次发送的消息收不到会暂存到缓存中,等凑够 32 才会监听到消息// 固定长度,如果长度不够且需要收到消息,通过补空格实现// 固定长度,如果 超过 32 则只会 收 32 长度的数据,剩余的不够 32 的暂存到缓存中,等凑够 32 才会监听到消息// 建议 发送的消息为 设置长度 32 的倍数,否则会将字节截断产生乱码// 一个汉字 3 个字节,一个, ;一个字节// 服务端 只能收到 一岁一枯荣,野火烧不尽; 服务端收到 4 条消息//String s = "一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);s = "一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;";channelFuture.channel().writeAndFlush(s);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";
}
LengthFieldBase
LengthFieldBasedFrameDecoder
指定长度来标识整包消息,通过在包头指定整包长度来约定包长;
一般在我们自定义TCP协议的时候可以在包头部分设置整包数据的长度,根据整包数据的长度来校验是否收取到完整的包,此解码器属于常用模式。
LengthFieldPrepender
设置此参数默认会给数据包的头部加上当前数据包的长度;
LengthFieldBasedFrameDecoder
参数说明
-
lengthFieldOffset
从头部开始需要偏移多少个字节,才是表示长度的字节所在的起始位,如我们定义的TCP协议里面,长度所在 位置是 从头部开始的 第5个字节,则 此处偏移量是 5; -
lengthFieldLength
表示整包数据长度在 TCP 协议里面占用几个 字节,一般 int 型 占 4 个字节,short 型占用 两个字节;byte 型占用1 个字节,long 型占用8 个字节;注意设置 数据包整体长度的时候不应该包含
lengthFieldLength
的长度; -
lengthAdjustment
. 建议此值的设置 +lengthFieldOffset
=0 ,如lengthFieldOffset
=5 则此处设置为 -5 -
initialBytesToStrip
表示 需要忽略的字节,但是此值的设置依赖lengthAdjustment
的设置;当lengthFieldOffset
+lengthAdjustment
=0 时,此值设置的是几则解析数据的时候会跳过几个字节; -
byteOrder
解析长度的时候 采用 大端模式还是小端模式,默认采用大端; -
failFast
为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常;
客户端
-
客户端配置
package com.example.netty.lengthfield.client;import com.example.netty.common.client.ClientHeartBeatServerHandler; import com.example.netty.common.utils.NettyConstants; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler;import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:06**/ @SuppressWarnings("all") public class LengthFieldClientInitializer extends ChannelInitializer<SocketChannel> {private static final String DECODER = "decoder";private static final String ENCODER = "encoder";private static final String PING = "ping";/*** 长度字段偏移位置为0表示从包的第一个字节开始读取;*/private final int lengthFieldOffset = 0;/*** 长度字段长为2,从包的开始位置往后2个字节的长度为长度字段;*/private final int lengthFieldLength = 4;/*** 解析的时候无需跳过任何长度;*/private final int lengthAdjustment = 0;/*** 无需去掉当前数据包的开头字节数, header + body*/private final int initialBytesToStrip = 4;/*** 如果failFast=true,当超过maxLength后会立刻抛出TooLongFrameException,不再进行解码;* 如果failFast=false,那么会等到解码出一个完整的消息后才会抛出TooLongFrameException*/private final boolean failFast = false;/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ClientHeartBeatServerHandler()).addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS))// 优先判断最大长度// stripDelimiter:解码后的消息是否去除分隔符 不去掉分割符// failFast = false, 那么会等到解码出完整消息才会抛出 TooLongException。// 对应的是server 的发送的配置,默认给前端添加了 4个字节作为长度,// 因此,长度起始位是 从0 开始,长度所占的字节是4个,解析的时候 需要 跳过 4个字节,将添加的长度去掉.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4,false)).addLast(ENCODER, new ByteArrayEncoder()).addLast(DECODER, new StringDecoder()).addLast(new LengthFieldClientHandler());} }
-
客户端Handler
package com.example.netty.lengthfield.client;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** <p></p>** @author xin* @version 2023/11/2 16:41**/ public class LengthFieldClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(LengthFieldClientHandler.class);@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.info("接收到服务端的响应");String body = (String) msg;System.out.println("-----client start------| \n " + body + " \n| ------client end------");} }
服务端
服务端配置
package com.example.netty.lengthfield.server;import com.example.netty.common.server.ConnectionCountHandler;
import com.example.netty.common.server.ServerHeartBeatServerHandler;
import com.example.netty.common.utils.NettyConstants;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;/*** <p></p>** @author xin* @version 2023/11/2 16:04**/
@SuppressWarnings("all")
public class LengthFieldServerInitializer extends ChannelInitializer<SocketChannel> {private static final String ENCODER = "encoder";private static final String DECODER = "decoder";/*** 长度字段偏移位置为0表示从包的第一个字节开始读取;* 此处表示从头开始的第10个字节开始计算长度 长度域从第几个字节开始*/private final int lengthFieldOffset = 5;/*** 长度字段长为4,从包的开始位置往后4个字节的长度为长度字段; 长度域占了几个字节*/private final int lengthFieldLength = 4;/*** 解析时候跳过多少个长度;长度域的偏移补偿*/private final int lengthAdjustment = -5;/*** 解码出一个数据包之后,跳过前面的几个字节*/private final int initialBytesToStrip = 0;/*** 如果failFast=true,当超过maxLength后会立刻抛出TooLongFrameException,不再进行解码;* 如果failFast=false,那么会等到解码出一个完整的消息后才会抛出TooLongFrameException* 则表示读取到长度域,他的值的超过maxFrameLength,就抛出一个 TooLongFrameException,而为false表示只有当真正读取完长度域的值表示的字节之后,才会抛出 TooLongFrameException,默认情况下设置为true,建议不要修改,否则可能会造成内存溢出。*/private final boolean failFast = false;/*** 为读超时时间(即多长时间没有接受到客户端发送数据)*/private final long readerIdleTime = 0;/*** 为写超时时间(即多长时间没有向客户端发送数据)*/private final long writerIdleTime = 0;/*** 所有类型(读或写)的超时时间*/private final long allIdleTime = 0;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.SECONDS))// 优先判断最大长度// stripDelimiter:解码后的消息是否去除分隔符// failFast = false, 那么会等到解码出完整消息才会抛出 TooLongException。
// ByteOrder.LITTLE_ENDIAN 解析长度的时候用小端.addLast(new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, Integer.MAX_VALUE, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast))// 给客户端发送的时候 会默认的给 头部加 4 个字节,加上 当前包的长度.addLast(new LengthFieldPrepender(4)).addLast(ENCODER, new StringEncoder())// 与发送端有关系// 接收端需要 转成何总格式,与 channelRead 相关,此处是将 ByteBuf 转成byte[].addLast(DECODER, new ByteArrayDecoder()).addLast(new ConnectionCountHandler()).addLast(new LengthFieldServerHandler()).addLast(new ServerHeartBeatServerHandler());}
}
服务端Handler
package com.example.netty.lengthfield.server;import cn.hutool.core.util.ByteUtil;
import com.example.netty.common.utils.ByteUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;/*** <p></p>** @author xin* @version 2023/11/2 16:39**/
public class LengthFieldServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(LengthFieldServerHandler.class);@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.debug("收到客户端消息");try {
// ByteBuf buf = (ByteBuf) msg;
// byte[] bytes = new byte[buf.readableBytes()];
// buf.readBytes(bytes);byte[] bytes = (byte[]) msg;String body = new String(new byte[]{bytes[0], bytes[1], bytes[2], bytes[3], bytes[4]});body = body + ByteUtil.bytesToInt(new byte[]{bytes[5], bytes[6], bytes[7], bytes[8]});byte[] bodyBytes = new byte[bytes.length - 9];System.arraycopy(bytes, 9, bodyBytes, 0, bodyBytes.length);body = body + new String(bodyBytes);System.out.println("-----client start------| \n " + body + " \n| ------client end------");// 长度 5String sendMsg = "我是服务端给客户端的消息;";ctx.channel().writeAndFlush(sendMsg);} catch (Exception e) {logger.error("接收数据异常", e);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
测试
@GetMapping("/app/netty/lengthField0")
public String testLengthField0() throws InterruptedException {ChannelFuture channelFuture = nettyClient.connect("127.0.0.1", 9000);//长度 64byte[] sendMsgBytes = ("一岁一枯荣,野火烧不尽;一岁一枯荣,野火烧不尽;").getBytes(StandardCharsets.UTF_8);//长度 5byte[] headerBytes = "ABCDE".getBytes(StandardCharsets.UTF_8);int bodyLength = sendMsgBytes.length + headerBytes.length;byte[] bytes = {};//长度 5bytes = ByteUtils.append(bytes, headerBytes);//长度 4 ,设置总长度的时候 只进行 数据包长度的设置,不包含 当前长度域的长度bytes = ByteUtils.append(bytes, ByteUtil.intToBytes(bodyLength));//长度 64bytes = ByteUtils.append(bytes, sendMsgBytes);channelFuture.channel().writeAndFlush(bytes);Thread.sleep(1000);nettyClient.disconnect(channelFuture.channel());return "发送成功";
}
服务端输出
说明
- 客户端发送的时候,从第6个字节开始设置包体的长度,长度域所占字节为4,因此服务端接收时,
LengthFieldBasedFrameDecoder
的配置如下:lengthFieldOffset
= 5, 头部偏移 5 个字节;lengthFieldLength
= 4 ,长度域 占用 4 个字节;lengthAdjustment
= -5, 偏移补偿 设置为 -5; 补偿头部的 5 个字节,这样解析的时候就会从第 0 个字节解析; 此值建议按照 相加 等于0 来设置,如果不是这样,解析的数据会有问题。initialBytesToStrip
= 0 ,跳过头部几个字节,此处没有跳过,因此输出的是完整的值
客户端输出
服务端给客户端发送消息
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.debug("收到客户端消息");try {
// ByteBuf buf = (ByteBuf) msg;
// byte[] bytes = new byte[buf.readableBytes()];
// buf.readBytes(bytes);byte[] bytes = (byte[]) msg;String body = new String(new byte[]{bytes[0], bytes[1], bytes[2], bytes[3], bytes[4]});body = body + ByteUtil.bytesToInt(new byte[]{bytes[5], bytes[6], bytes[7], bytes[8]});byte[] bodyBytes = new byte[bytes.length - 9];System.arraycopy(bytes, 9, bodyBytes, 0, bodyBytes.length);body = body + new String(bodyBytes);System.out.println("-----client start------| \n " + body + " \n| ------client end------");// 长度 5// 服务端给客户端发送消息,以字符串 编码、解码方式String sendMsg = "我是服务端给客户端的消息;";ctx.channel().writeAndFlush(sendMsg);} catch (Exception e) {logger.error("接收数据异常", e);}}
服务端编码设置
客户端输出
说明
- LengthFieldServerInitializer 设置 .addLast(new LengthFieldPrepender(4)) 表示在发送字符串时主动在头部加 4 个字节,表示当前数据包的长度;
- LengthFieldClientInitializer 设置 LengthFieldBasedFrameDecoder 参数
lengthFieldOffset
= 0, 默认在头部加长度,因此不需要偏移;lengthFieldLength
= 4 ,长度域 占用 4 个字节;lengthAdjustment
= 0,此值建议按照lengthFieldOffset
+lengthAdjustment
相加 等于0 来设置,如果不是这样,解析的数据会有问题。initialBytesToStrip
= 4 ,跳过头部几个字节,此处跳过长度字节
设置不跳过前4 个字节,直接打印输出,乱码:
解析显示长度
参考文档
https://www.cnblogs.com/rickiyang/p/12904552.html
https://www.cnblogs.com/caoweixiong/p/14666400.html