21.Netty源码之编码器


highlight: arduino-light

Netty如何实现自定义通信协议

在学习完如何设计协议之后,我们又该如何在 Netty 中实现自定义的通信协议呢?其实 Netty 作为一个非常优秀的网络通信框架,已经为我们提供了非常丰富的编解码抽象基类,帮助我们更方便地基于这些抽象基类扩展实现自定义协议。

首先我们看下 Netty 中常用的编解码器有哪些。

一次编解码器和二次编解码器

Netty中的编解码器分为一次编解码和二次编解码。

一次编解码器:MessageToByteEncoder、ByteToMessageDecoder/ReplyingDecoder

二次编解码器:MessageToMessageEncoder、MessageToMessageDecoder

以解码为例,一次解码器用于解决TCP拆包/粘包问题,解析得到字节数据。

如果需要对解析后的字节数据做对象转换,需要使用二次解码器。同理,编码器是相反过程。

为什么需要二次码/编码

假设我们把解决半包粘包问题的常用三种解码器叫一次解码器:

image.png

那么我们在项目中,除了可选的的压缩解压缩之外,还需要一层解码,因为一次解码的结果是字节,需要和项目中所使用的对象做转化,方便使用,这层解码器可以称为“二次解码器”。

相应的,对应的二次编码器是为了将 Java 对象转化成字节流方便存储或传输。

为什么不合并一次二次解码器

思考:是不是也可以一步到位? 合并 1 次解码(解决粘包、半包)和 2 次解码(解决可操作问题)

可以,但是不建议: •没有分层,不够清晰;分层可以组合。 •耦合性高,不容易置换方案。

常用的编解码方式

-Java 序列化

-Marshaling

-XML

-JSON

-MessagePack

-Protobuf

-其他

选择编解码方式的因素

-空间:编码后占用空间 -时间:编解码速度 -是否追求可读性 -是否支持多语言,例如msgpack的支持:Java\C\Python等

Protobuf

-Protobuf 是一个灵活的、高效的用于序列化数据的协议。

-相比较 XML 和 JSON 格式,Protobuf 更小、更快、更便捷。

-Protobuf 是跨语言的,并且自带了一个编译器(protoc),只需要用它进行编译,可以自动生成 Java、python、C++ 等代码,不需要再写其他代码。

Protobuf使用步骤

第1步:在Maven 项目中引入 Protobuf 坐标,下载相关的jar包。

在pom.xml中 添加依赖

xml <dependencies> <dependency>    <groupId>com.google.protobuf</groupId>    <artifactId>protobuf-java</artifactId>    <version>3.6.1</version> </dependency> </dependencies>

第 2 步: 编写proto文件:Student.proto。

Student.proto的内容

```java syntax = "proto3"; //版本 option javaouterclassname = "StudentPoJO"; //指定生成的Java类名

//内部类的名称,是真正的PoJo 类 message Student{ // message 的规定的   int32 id = 1; //PoJo 类的属性数据类型类型和 序号(不是属性值)   string name = 2; } ```

第 3 步:通过 protoc.exe 根据描述文件生成 Java 类。

说明:protoc-3.6.1-win32 是从网上下载的 google 提供的文件.

cmd执行命令生成StudentPoJO.java: C:\Users\Administrator\Desktop\Netty资料\我的\资料\protoc-3.6.1-win32\bin>protoc.exe --java_out=. Student.proto

第4步:把生成的 StudentPoJo.java 拷贝到自己的项目中打开。

第 5 步:在 Netty 中使用。

java ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(StudentPoJO.Student.getDefaultInstance())); ​ ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast(new ProtobufEncoder());

抽象编码类

所有的解码器都继承了ChannelInBoundHandler。因为解码是需要解码接收的数据。所以使用In。

所有的编码器都继承了ChannelOutBoundHandler。因为编码是需要将对外发送的数据编码。所以使用Out。

image.png

通过抽象编码类的继承图可以看出编码类是 ChanneOutboundHandler 的抽象类实现,具体操作的是 Outbound 出站数据。

常用编码器类型

  • MessageToByteEncoder 对象编码成字节流;
  • MessageToMessageEncoder 一种对象消息类型编码成另外一种对象消息类型。

使用一次编码器IntegerEncoder和二次编码器IntegerToStringEncoder,将消息从Integer编码为String。

java class IntegerEncoder extends MessageToByteEncoder<Integer> {    @Override    public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {        out.writeInt(msg);   } } ​ class IntegerToStringEncoder extends MessageToMessageEncoder<Integer> {    @Override    public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out) throws Exception {        out.add(message.toString());   } }

使用一次编码器StringEncoder和二次编码器StringToIntegerEncoder,将消息从String编码为Integer。

class StringEncoder extends MessageToByteEncoder<String> {    @Override    public void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {        out.writeCharSequence(msg, Charset.defaultCharset());   } } ​ class StringToIntegerEncoder extends MessageToMessageEncoder<String> {    @Override    public void encode(ChannelHandlerContext ctx, String message, List<Object> out) throws Exception {        out.add(Integer.parseInt(message));   } }

编码器MessageToByteEncoder

MessageToByteEncoder用于将对象编码成字节流,MessageToByteEncoder 提供了唯一的 encode 抽象方法,我们需要实现encode 方法即可完成自定义编码。那么encode() 方法是在什么时候被调用的呢?

我们一起看下MessageToByteEncoder 的核心源码片段,如下所示。

MessageToByteEncoder继承自ChannelOutboundHandlerAdapter。

ChannelOutboundHandlerAdapter 实现了 ChannelOutboundHandler接口,重写了write方法。

这里使用了模板模式:encode方法交给具体的子类实现。

java public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { ​ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {    ByteBuf buf = null;    try {   // 1.消息类型是否匹配 不匹配不会处理        // 即传入的是String        if (acceptOutboundMessage(msg)) {            //I是泛型            @SuppressWarnings("unchecked")            I cast = (I) msg;            // 2. 分配 ByteBuf 资源            buf = allocateBuffer(ctx, cast, preferDirect);            try {            // 3. 执行 encode 方法完成数据编码                encode(ctx, cast, buf);           } finally {                ReferenceCountUtil.release(cast);           }            if (buf.isReadable()) {            // 4. 向后传递写事件                ctx.write(buf, promise);           } else {                buf.release();                ctx.write(Unpooled.EMPTY_BUFFER, promise);           }            buf = null;       } else {            ctx.write(msg, promise);       }   } catch (EncoderException e) {        throw e;   } catch (Throwable e) {        throw new EncoderException(e);   } finally {        if (buf != null) {            buf.release();       }   } } } //供子类重写 protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

MessageToByteEncoder 重写了 ChanneOutboundHandler 的 write() 方法,其主要逻辑分为以下几个步骤:

  1. acceptOutboundMessage 判断是否有匹配的消息类型,如果匹配需要执行编码流程,如果不匹配直接继续传递给下一个 ChannelOutboundHandler。
  2. 分配 ByteBuf 资源,默认使用堆外内存。
  3. 调用子类实现的 encode 方法完成数据编码,一旦消息被成功编码,会通过调用ReferenceCountUtil.release(cast) 自动释放。
  4. 如果 ByteBuf 可读,说明已经成功编码得到数据,然后写入 ChannelHandlerContext 交到下一个节点。如果 ByteBuf 不可读,则释放 ByteBuf 资源,向下传递空的 ByteBuf 对象。

实现类:StringToByteEncoder

编码器实现非常简单,不需要关注拆包/粘包问题。

如下例子,展示了如何将字符串类型的数据写入到 ByteBuf 实例,ByteBuf 实例将传递给 ChannelPipeline 链表中的下一个 ChannelOutboundHandler。

java package io.netty.example.Encode; ​ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; ​ public class StringToByteEncoder extends MessageToByteEncoder<String> { ​    @Override    protected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception {      byteBuf.writeBytes(data.getBytes());   } }

编码器MessageToMessageEncoder

https://www.javajike.com/book/essential-netty-in-action/chapter4/66cf00f545a4c73fa3fd2fad8d0b7a1d.html

MessageToMessageEncoder 与 MessageToByteEncoder 类似,同样只需要实现 encode 方法。

与 MessageToByteEncoder 不同的是,MessageToMessageEncoder 是将一种格式的消息转换为另外一种格式的消息。

其中第二个 Message 所指的可以是任意一个对象,如果该对象是 ByteBuf 类型,那么和 MessageToByteEncoder 的实现原理是一致的。

MessageToMessageEncoder继承自ChannelOutboundHandlerAdapter。

ChannelOutboundHandlerAdapter 实现了 ChannelOutboundHandler接口,重写了write方法。

这里使用了模板模式:encode方法交给具体的子类实现。

java    @Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        CodecOutputList out = null;        try {            // 1. 消息类型是否匹配 不匹配不会处理       // 即传入的是String            if (acceptOutboundMessage(msg)) {                out = CodecOutputList.newInstance();                 //I是泛型                @SuppressWarnings("unchecked")                I cast = (I) msg;                try {                    //执行子类encode完成具体编码操作                    encode(ctx, cast, out);               } finally {                    ReferenceCountUtil.release(cast);               } //如果输出结果是对象列表out是空                if (out.isEmpty()) {                    out.recycle();                    out = null;                    throw new EncoderException(" must produce at least one message.");               }           } else {                ctx.write(msg, promise);           }       } catch (EncoderException e) {            throw e;       } catch (Throwable t) {            throw new EncoderException(t);       } finally {            if (out != null) {                final int sizeMinusOne = out.size() - 1;                // sizeMinusOne等于0说明 out长度是1                if (sizeMinusOne == 0) {                    //写出去                    ctx.write(out.getUnsafe(0), promise);               } else if (sizeMinusOne > 0) {                    //遍历写出去                    if (promise == ctx.voidPromise()) {                        writeVoidPromise(ctx, out);                   } else {                        writePromiseCombiner(ctx, out, promise);                   }               }                //回收                out.recycle();           }       }   }

此外 MessageToByteEncoder 的输出结果是对象列表out,编码后的结果属于中间对象,最终仍然会转化成 ByteBuf 进行传输。

MessageToMessageEncoder 常用的实现子类有 StringEncoder、LineEncoder、Base64Encoder 等。以 StringEncoder 为例看下 MessageToMessageEncoder 的用法。

实现类:StringEncoder

java @Sharable public class StringEncoder extends MessageToMessageEncoder<CharSequence> { ​    // TODO Use CharsetEncoder instead.    private final Charset charset; ​    /**     * Creates a new instance with the current system character set.     */    public StringEncoder() {        this(Charset.defaultCharset());   } ​    /**     * Creates a new instance with the specified character set.     */    public StringEncoder(Charset charset) {        if (charset == null) {            throw new NullPointerException("charset");       }        this.charset = charset;   } ​    @Override    protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {        if (msg.length() == 0) {            return;       } //编码以后加入out列表 由父类写出即可        out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));   } } ​

思考:现在有1个Java对象要编码为json字符串后转换为byte传输 如何做呢?

1.继承MessageToMessageEncoder

2.重写encode方法

3.将对象序列化为json

4.将json转为ByteBuffer发送:ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(json), charset)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/52260.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt视频播放器

一、设置好ui界面二、打开文件槽函数1.QDir::homePath()作用介绍2.QFileDialog::getOpenFileName()介绍3.QFileInfo介绍4.player 指针解释5.打开文件槽函数完整代码 三、视频播放器初始化1.QMediaPlayer()函数2.设置时间间隔的作用3. QGraphicsScene介绍4.QGraphicsVideoItem介…

高性能网络框架笔记

目录 TCP粘包、分包惊群断开连接&#xff0c;TCP怎么检测的&#xff1f;大量的close wait&#xff0c;如何解 ?双方同时调用close水平触发和边沿触发的区别 TCP粘包、分包 解决&#xff1a;1.应用层协议头前面pktlen&#xff1b;2.为每一个包加上分隔符&#xff1b;(\r\n&…

Docker网络模型使用详解(2)Docker网络模式

安装Docker时会自动创建3个网络&#xff0c;可以使用docker network ls命令列出这些网络。 [rootlocalhost ~]# docker network ls NETWORK ID NAME DRIVER SCOPE ebcfad6f4255 bridge bridge local b881c67f8813 compose_lnmp_lnmp…

逆向时如何找到MingGW(GNU)编译程序的main函数

编译器是MingGW生成的可执行文件的显著特点是, 最终运行ZwContinue后程序就莫名其妙启动了, 也找不到main函数。 为了探究里面究竟怎么回事, 我找到了wrk-v1.2的源码, 其中包含了ZwContinue的实现, 首先先看一下注释, API界面包含了2个参数, 其中让人感兴趣的是PCONTEXT, 这是…

答辩PPT怎么做?在线PPT软件哪个好?

又是一年毕业季&#xff0c;相信很多毕业生都开始准备论文答辩&#xff0c;有些同学正在为论文奋夜苦战&#xff0c;有些则是为论文答辩PPT而烦恼。做PPT要用什么软件好呢&#xff1f;这篇文章就来告诉你。 当下有很多PPT制作工具&#xff0c;其中自然也包括Office三件套。这些…

ClickHouse目录结构

默认安装路径&#xff1a;/var/lib/clickhouse/ 目录结构&#xff1a; 主要介绍metadata和data metadata 其中的default、system及相应的数据库&#xff0c;.sql文件即数据库创建相关sql语句 进入default数据库&#xff08;默认数据库&#xff09;&#xff1a; 可以看到数据库…

redis数据未到过期时间被删除

1. 问题描述 使用了jeecgboot开发后端代码&#xff0c;代码设置的redis过期时间为24小时&#xff0c;部署使用的宝塔面板&#xff0c;在redis中看到的过期时间也是为24小时&#xff0c;但是并未到过期时间&#xff0c;数据就被删除。 2. 解决办法 观察了一下redis中的数据&a…

【C++】数据结构与算法:常用排序算法

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍常用排序算法。 学其所用&#xff0c;用其所学。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一下&#xff0c;下次更新不迷路&#x1…

力扣 1049. 最后一块石头的重量 II

题目来源&#xff1a;https://leetcode.cn/problems/last-stone-weight-ii/description/ C题解&#xff08;思路来源代码随想录&#xff09;&#xff1a;本题其实就是尽量让石头分成重量相同的两堆&#xff0c;相撞之后剩下的石头最小&#xff0c;这样就化解成01背包问题了。 …

【JavaEE】Spring Boot - 项目的创建和使用

【JavaEE】Spring Boot 开发要点总结&#xff08;1&#xff09; 文章目录 【JavaEE】Spring Boot 开发要点总结&#xff08;1&#xff09;1. Spring Boot 的优点2. Spring Boot 项目创建2.1 下载安装插件2.2 创建项目过程2.3 加载项目2.4 启动项目2.5 删除一些没用的文件 3. Sp…

[腾讯云Cloud Studio实战训练营]无门槛使用GPT+Cloud Studio辅助编程完成Excel自动工资结算

目录 前言一、Cloud Studio产品介绍1.1 注册Cloud Studio 二、项目实验2.1 选择合适的开发环境2.2 实验项目介绍2.3 实验步骤三、总结 前言 chatgpt简单介绍: ChatGPT是一种基于GPT的自然语言处理模型&#xff0c;专门用于生成对话式文本。它是OpenAI于2021年发布的&#xff0…

#rust taur运行报错#

场景:在window11系统上运行 tauri桌面莹应用&#xff0c;提示错误。 Visual Studio 2022 生成工具 安装的sdk11 , rust运行模式是stable-x86_64-pc-window-gnu&#xff0c; 运行npm run tauir dev 一致失败&#xff0c;失败信息如下 原因&#xff1a;1&#xff1a;在window11系…