【源码】ByteToMessageDecoder对比自定义实现

news/2025/1/8 20:13:20/文章来源:https://www.cnblogs.com/longfurcat/p/18660188

前言

在上一篇随笔中,我们探讨了如何实现一套自定义通信协议,其中涉及到的粘包和拆包处理最初是完全自定义实现的,后来则改为了继承 ByteToMessageDecoder 来简化处理。

本篇将重点讨论这两种实现方式在缓存管理上的主要区别,并深入分析其中的不同之处以及值得借鉴的经验和技巧。

代码回顾

1)完全自定义实现

无缓存的情况

  • 反复从ByteBuf中提取完整的消息
  • 剩余的残缺消息写入缓存(会进行数据拷贝)

有缓存的情况

  • 将新收到的数据接入缓存
  • 反复从缓存中提取完整消息
  • 释放缓存内读取过的数据(会进行数据移动,导致拷贝)
public class EchoServerHandler extends ChannelInboundHandlerAdapter {private static final int HEADER_LENGTH = 4; //消息头部长度private ByteBuf buffer = Unpooled.buffer(1024); //缓存残缺消息
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf income = (ByteBuf) msg;//上一次有缓存存在,则本数据包不是消息头开头,if(buffer.readableBytes() > 0) {//进行必要的扩容,下面的readBytes不会自动扩容
            buffer.ensureWritable(income.readableBytes()); income.readBytes(buffer, income.readableBytes());readMsgFromBuffer(buffer);//剩下一点残缺消息if(buffer.readableBytes() > 0) {//保留剩下的数据,重置读索引为0System.out.println("缓存剩余字节:"+buffer.readableBytes());buffer.discardReadBytes();} else { //刚刚好,则清空数据
                buffer.clear();}} else {readMsgFromBuffer(income);//剩下的数据全部写入缓存if (income.readableBytes() >0) {System.out.println("剩余字节:"+income.readableBytes());income.readBytes(buffer, income.readableBytes());}}}//从字节数组中读取完整的消息private void readMsgFromBuffer(ByteBuf byteBuf) {//剩余可读消息是否包含一个消息头while(byteBuf.readableBytes() >= HEADER_LENGTH) {byteBuf.markReaderIndex(); //由于可能读不到完整的消息,所以读之前先标记索引位置,方便重置//读取消息头byte[] headerBytes = new byte[4];byteBuf.readBytes(headerBytes);//获取类型int type = headerBytes[0] & 0xFF;//获取消息体长度int bodyLength = ((headerBytes[1] & 0xFF) << 16) |((headerBytes[2] & 0xFF) << 8) |(headerBytes[3] & 0xFF);//不包含请求体if (byteBuf.readableBytes() < bodyLength) {byteBuf.resetReaderIndex(); //重置读索引到当前消息头位置break;}// 完整消息体已经接收,处理消息byte[] body = new byte[bodyLength];byteBuf.readBytes(body);//System.out.println("type:"+type+"||length:"+bodyLength+"||body:"+new String(body, CharsetUtil.UTF_8));if(type == 1) {try {HelloRequest request = HelloRequest.parseFrom(body);System.out.println("收到消息:"+request.toString());} catch (Exception e) {System.out.println("解析失败:"+new String(body, CharsetUtil.UTF_8));}} else {System.out.println("消息类型未知:"+type);}}}....
}

2)继承ByteToMessageDecoder的实现

使用ByteToMessageDecoder后,数据的解码变得更加简化。只需检查缓冲区是否有足够的数据来提取一个/多个完整的消息。

如果数据不足,解码过程就会结束,无需额外管理缓存。

public class MessageDecoder extends ByteToMessageDecoder {private static final int HEADER_LENGTH = 4; //消息头部长度
@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {// 检查是否足够的字节来读取一个消息头while (in.readableBytes() >= HEADER_LENGTH) {in.markReaderIndex(); // 标记当前读取位置,便于重置// 读取消息头部byte[] headerBytes = new byte[4];in.readBytes(headerBytes);// 获取类型int type = headerBytes[0] & 0xFF;// 获取消息体长度int bodyLength = ((headerBytes[1] & 0xFF) << 16) |((headerBytes[2] & 0xFF) << 8) |(headerBytes[3] & 0xFF);// 检查缓冲区中的数据是否足够读取整个消息体if (in.readableBytes() < bodyLength) {in.resetReaderIndex(); // 重置读指针,等待更多数据break;}// 读取消息体byte[] body = new byte[bodyLength];in.readBytes(body);// 处理消息try {Object msg = null;if(type == 1) {msg = HelloRequest.parseFrom(body);} else if(type == 2) {msg = HelloResponse.parseFrom(body);} else {System.out.println("未知消息:"+new String(body, CharsetUtil.UTF_8));}if(Objects.nonNull(msg)) {out.add(msg);}} catch (Exception e) {System.out.println("解析失败: " + new String(body, CharsetUtil.UTF_8));}}}
}

ByteToMessageDecoder源码

核心属性

    //缓存private ByteBuf cumulation;//累加器(用于拼接缓存和新到数据)private Cumulator cumulator = MERGE_CUMULATOR;//X次channelRead之后,释放已读数据private int discardAfterReads = 16;//累计channelRead次数(每次释放完会重置)private int numReads;

处理流程

1.新到数据存放到缓冲区(使用累加器Cumulator进行数据合并)

2.循环调用子类的decode方法,读取消息存入List,直到数据不足

3.遍历List,依次传递给下一个处理器

累加器

提供2种累加器实现,MERGE_CUMULATOR和COMPOSITE_CUMULATOR

1)MERGE_CUMULATOR(默认实现)

缓存存在的时候,直接进行数据拷贝,与缓存数据进行整合。

下面的代码可以看到,如果缓冲区空间不够,则会进行扩容操作。

跟自定义实现中的"buffer.ensureWritable(income.readableBytes())"一致。

整体思路跟自定义实现差不多,不过它多考虑了两种情况

  • 数据被共享:共享数据会被其他使用者影响,需排除影响
  • 数据只读:只读空间无法被写入,而缓冲区是需要写入新数据的
    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {//cumulation是上一次的缓存,in是新到的数据
        @Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {try {final ByteBuf buffer;if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {// Expand cumulation (by replace it) when either there is not more room in the buffer// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or// duplicate().retain() or if its read-only.//// See:// - https://github.com/netty/netty/issues/2327// - https://github.com/netty/netty/issues/1764buffer = expandCumulation(alloc, cumulation, in.readableBytes());} else {buffer = cumulation;}//新到数据写入缓存
                buffer.writeBytes(in);return buffer;} finally {// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw// for whatever release (for example because of OutOfMemoryError)
                in.release();}}};

2)COMPOSITE_CUMULATOR

 上面的处理,新到数据与缓存的合并是通过数据拷贝。而下面这种方式,则是使用组合(数据没有移动,只是提供一个整合后的视图)

  public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {ByteBuf buffer;try {if (cumulation.refCnt() > 1) {// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the// user use slice().retain() or duplicate().retain().//// See:// - https://github.com/netty/netty/issues/2327// - https://github.com/netty/netty/issues/1764buffer = expandCumulation(alloc, cumulation, in.readableBytes());buffer.writeBytes(in);} else {CompositeByteBuf composite;if (cumulation instanceof CompositeByteBuf) {//上一次缓存已经是组合对象composite = (CompositeByteBuf) cumulation;} else {composite = alloc.compositeBuffer(Integer.MAX_VALUE);//缓存加入组合composite.addComponent(true, cumulation);}//新到数据加入组合composite.addComponent(true, in);in = null;buffer = composite;}return buffer;} finally {//由于使用组合方式,数据还在原来的地方。不能直接释放if (in != null) {// We must release if the ownership was not transferred as otherwise it may produce a leak if// writeBytes(...) throw for whatever release (for example because of OutOfMemoryError).
                    in.release();}}}};

主要方法——channelRead

在上述的自定义实现中,每次从缓冲区读取完数据,会释放掉已读数据,防止缓存数据无限增长。

buffer.discardReadBytes();

而这里做了优化,累积16次读取后,才会进行释放。(channelReadComplete的时候也会触发)

这样做的好处,就是可以减少数据拷贝的次数。(discard操作会把已读数据清空,重置读索引,然后把剩余数据往前挪)

    @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//仅处理ByteBuf,其他消息直接传给下一个Handlerif (msg instanceof ByteBuf) {CodecOutputList out = CodecOutputList.newInstance();try {ByteBuf data = (ByteBuf) msg;first = cumulation == null;//缓冲区为空,直接赋值if (first) {cumulation = data;} else {//使用累加器进行数据合并cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);}//调用子类实现,从缓冲区中解析消息
                callDecode(ctx, cumulation, out);} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);} finally {if (cumulation != null && !cumulation.isReadable()) {//缓冲区数据刚好读完,清空缓冲区,清空已读次数numReads = 0;cumulation.release();cumulation = null;} else if (++ numReads >= discardAfterReads) {// We did enough reads already try to discard some bytes so we not risk to see a OOME.// See https://github.com/netty/netty/issues/4275//已读数达到限定次数(默认16),释放已读数据numReads = 0;discardSomeReadBytes();}int size = out.size();//是不是没解析到消息decodeWasNull = !out.insertSinceRecycled();//将解析出来的消息逐个传个下一个Handler
                fireChannelRead(ctx, out, size);//清空List,下次再用
                out.recycle();}} else {//直接丢给下一个Handler
            ctx.fireChannelRead(msg);}}

主要方法——callDecode

这里主要通过检查List结果集和数据读取情况,来判断要不要结束解码循环。

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {while (in.isReadable()) {//先读取List大小int outSize = out.size();//有数据,则先传给下一个Handlerif (outSize > 0) {fireChannelRead(ctx, out, outSize);out.clear();// Check if this handler was removed before continuing with decoding.// If it was removed, it is not safe to continue to operate on the buffer.//// See:// - https://github.com/netty/netty/issues/4635if (ctx.isRemoved()) {break;}outSize = 0;}//开始之前,先记录可读数据量int oldInputLength = in.readableBytes();//调用子类decode方法
                decodeRemovalReentryProtection(ctx, in, out);// Check if this handler was removed before continuing the loop.// If it was removed, it is not safe to continue to operate on the buffer.//// See https://github.com/netty/netty/issues/1664if (ctx.isRemoved()) {break;}//查看子类是否解析出数据if (outSize == out.size()) {//数据没被动过,说明没有可解析的数据,直接breakif (oldInputLength == in.readableBytes()) {break;} else { //数据有被动过,但还没解析出数据,继续执行continue;}}//List内有新数据,但是数据没有被读过,说明子类实现有问题,报错if (oldInputLength == in.readableBytes()) {throw new DecoderException(StringUtil.simpleClassName(getClass()) +".decode() did not read anything but decoded a message.");}//如果只解析一次,则直接结束if (isSingleDecode()) {break;}}} catch (DecoderException e) {throw e;} catch (Exception cause) {throw new DecoderException(cause);}}

总结

核心内容并无太大差异,但 Netty 提供的抽象类在实现上考虑了更多细节,并经过社区的不断演进,功能变得更加稳定和完善。

因此,推荐继承 ByteToMessageDecoder 来实现解码。

其中,减少释放次数的设计思想值得学习。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/866052.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025.1.8 鲜花

Nim 的变种Nim 的变种グランドエスケープ 空飛ぶ羽根と引き換えに 繋ぎ合う手を選んだ僕ら 没有选择飞翔的翅膀 而是选择十指相扣的我们 それでも空に魅せられて 夢を重ねるのは罪か 却仍然向往着天空 反复做着同样的梦 这有错吗 夏は秋の背中を見て その顔を思い浮かべる 夏…

CTF 之 Crypto (Cryptography) 学习笔记

CTF 之 Crypto (Cryptography) Chapter 0. 前置知识 群 (Group) 给定一个集合 \(G\neq \emptyset\) 以及二元代数运算 \(\circ\),若满足:封闭性 (Closure):\(\forall u,v\in G\),\(u\circ v\in G\); 结合律 (Associativity):\(\forall u,v,w\in G\),\((u\circ v)\circ w…

(2025自测有效!)全网最好的python配置教程【非常非常适合小白】

前几天我的电脑刚刚重装,把python重新配置了一下。 1.Python环境部署Python3 可应用于多平台包括 Windows、Linux 和 Mac OS X。 Python官网:https://www.python.org/ 进入官网在导航栏选择Dowmloads,选择所使用的系统(以Windows为例) 进入Windows下载页之后选择需要下载的…

写一个支持折叠、有缩进、代码高亮、离线的,方便部署的、易用的、优雅的json格式化查看工具(附html完整代码)

缘由 网上的在线json格式化有很多,但我是个有追求的人。在线的很难同时支持折叠、有缩进线、代码高亮、离线的,方便部署的、易用的、不请求后端(为了安全)的json格式化工具。 去Github上找项目,华而不实的东西占半个屏幕,格式化json要点好几下,一个json格式化工具npm安装…

AGC041F Histogram Rooks

我不知道啊,我只是觉得容斥很好玩。一个朴素的想法是容斥:考虑钦定 \(S\) 集合的位置没有被车覆盖,则答案是 \((-1)^{|S|}2^{c}\),其中 \(c\) 是可以放车的位置,可以直接 dp 做到 \(\mathrm{O}(2^n \text{poly}(n))\),但是难以优化。 延续容斥的想法,注意到钦定一个位置…

rust学习十六.1、并发-乱弹和一个简单并发例子

如书本作者所言,并发编程在绝大部分语言中,都是相对复杂和困难的。 所以,涉及的内容会相对多一些,所涵盖的内容绝对不是几篇文章所可以容纳的。 权当一个乱弹琴! 和此系列的其它文章一样,本文的内容绝大部分来自于相关书籍,本人做了一些摘裁的工作,取我所需!一、无畏并…

解决jenkins git 拉取代码超时问题

jenkins默认的是时间是10分钟,在git fetch时候超过10分钟了就报错失败了,可在项目源码管理 新增 advance clone behaviours

Java Bluetooth 蓝牙通讯 BlueCove 扫描附近的蓝牙设备

目录BlueCove项目概述BlueCove API架构API的设计原则和实现方式关键类和方法的功能描述测试代码获取本机(PC)蓝牙扫描蓝牙 BlueCove项目概述 BlueCove是一个开源的蓝牙协议栈实现,旨在为Java开发者提供一个全面的、易于使用的API,从而在应用程序中实现蓝牙功能。该项目支持多…

英伟达世界基础模型 Cosmos,教 AI 理解物理世界;阿里通义与雷鸟合作推出 AI 眼镜丨 RTE 开发者日报

开发者朋友们大家好:这里是 「RTE 开发者日报」 ,每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE(Real-Time Engagement) 领域内「有话题的新闻」、「有态度的 观点 」、「有意思的 数据 」、「有思考的 文章 」、「有看点的 会议 」,但内容仅代表编辑的…

.NET 窗口置于最顶层

本文介绍如何将窗口置于最顶层,以及解决在顶层显示时对锁屏登录界面的影响。用于实现类似Launcher、系统工具等应用需要窗口层级比Windows开始菜单以及置顶任务栏还要高的场景 一般情况下的窗口置顶,可以设置WPF窗口属性Topmost=true 也可以使用WIN32-SetWindowPos函数SetWin…

汽修行业的智能化转型:AI赋能员工培训SOP策略

随着智能化技术的飞速发展,汽修行业也迎来了前所未有的变革机遇。在这一背景下,如何构建高效、标准化的员工培训SOP(Standard Operating Procedure)策略,成为汽修企业提升竞争力、实现智能化转型的关键。本文将探讨智能AI在汽修行业员工培训SOP策略构建中的应用,特别是提…

智慧医疗新纪元:帮助中心引领的智能化转型之路

在科技日新月异的今天,智慧医疗已经成为医疗领域发展的重要趋势。智慧医疗不仅意味着医疗设备的智能化,更代表着整个医疗服务流程的数字化转型。在这一变革过程中,如何有效利用帮助中心驱动医疗机构的转型与升级,成为了一个值得深入探讨的课题。本文将探讨智慧医疗的智能跃…