【编码】自定义通信协议——支持更多请求类型

news/2025/1/31 6:44:43/文章来源:https://www.cnblogs.com/longfurcat/p/18682229

前言

上一篇随笔"如何实现一套自定义网络协议",介绍了自定义协议的粘拆包的处理,消息体的编解码等。

其中包含一个案例,演示怎么实现一个RPC实现。

不过案例中的Request格式是固定的,实际应用中,RPC协议需要支持不同的API。

所以需要有一个字段来表示API类型,而字段的存放位置有两种选择

1)消息头的第一个字节

2)消息体

一个字节能表示值0~255。如果使用这个字节来表示请求类型,可能不够用。

所以本文具体的API定义在消息体中。

协议回顾

数据包=消息头(4字节)+消息体(n字节)

消息头=类型(1字节)+消息体长度(3字节)

类型值:

  • 1 => 请求
  • 2 => 响应
  • 3 => Ping
  • 4 => Pong

协议补充

以下格式设计针对“请求-响应”的消息体

1)请求

message Header {required string key = 1;required string value = 2;
}message BaseRequest {required int32 api = 1;required int32 version = 2;required int32 msgId = 3;repeated Header headers = 4;optional bytes data = 5;
}message HelloRequestData {required string content = 1;
}

固定部分

  • api:请求类型,int32
  • version:请求版本,int32
  • msgId:请求ID,int32

可变Header

  • headers:key-value

可变Data

  • bytes

解析方式:

1.处理粘拆包,得到完整消息体

2.使用protobuf解析消息体,读取消息体中的api字段

3.根据api字段知道请求类型,使用对应的protobuf解析data,得到请求payload对象

2)响应

message BaseResponse {required int32 msgId = 1;repeated Header headers = 2;optional bytes data = 3;
}message HelloResponseData {required int32 code = 1;required string content = 2;
}

固定部分

  • msgId: 请求ID,int32

可变Header部分

  • headers: key-value

可变Data

  • bytes

处理方式:

1.构建响应Data

2.构建BaseResponse

3.将Data转成ByteString塞入BaseResponse

基础建设——连接管理

连接管理包含两部分:

  • 存储Socket关联关系
    • 使用Map存储每个Socket的关联关系,便于后续引用。
  • Socket心跳检查
    • 通过Ping-Pong机制,定期检查客户端的连接状态,及时识别无响应的客户端并释放服务端资源。

释放什么资源?

socket需要占用服务端的内存资源(发送和接收缓冲区),还有文件描述符

客户端什么时候会出现无响应?

  • 客户端进程卡死,例如死锁或CPU饱和。
  • 客户端意外断开(如断电,系统崩溃),无法发送TCP的FIN包来关闭连接。服务端无法感知到连接关闭

ConnectionManager

public class ConnectionManager {private static final Timer timer = new HashedWheelTimer();private static final long HEARTBEAT_INTERVAL = 10;  // 10秒private static final long HEARTBEAT_TIMEOUT = 30;    // 30秒超时private final Map<String, ChannelHandlerContext> connectionMap = new ConcurrentHashMap<>();private final Map<String, HeartBeatTask> taskMap = new ConcurrentHashMap<>();public void addConnection(String id, ChannelHandlerContext ctx) {connectionMap.put(id, ctx);//创建心跳定时任务HeartBeatTask heartBeatTask = new HeartBeatTask(ctx.channel());timer.newTimeout(heartBeatTask, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);taskMap.put(id, heartBeatTask);}public ChannelHandlerContext getConnection(String id) {return connectionMap.get(id);}public void removeConnection(String id) {System.out.println("断开连接:"+id);connectionMap.remove(id);taskMap.remove(id);}public void onHeartbeatResponse(String id) {System.out.println("收到Pong:"+id);HeartBeatTask task = taskMap.get(id);if(Objects.nonNull(task)) {task.onHeartbeatReceived();}}public void clear() {connectionMap.clear();}public class HeartBeatTask implements TimerTask {private final Channel channel;private long lastHeartbeatTime;public HeartBeatTask(Channel channel) {this.channel = channel;this.lastHeartbeatTime = System.currentTimeMillis();}@Overridepublic void run(Timeout timeout) throws Exception {long now = System.currentTimeMillis();if(now - lastHeartbeatTime > HEARTBEAT_TIMEOUT * 1000) {System.out.println("Heartbeat timeout, disconnecting channel " + channel.id().asLongText());channel.close();return;}sendHeartbeat();}private void sendHeartbeat() {if(channel.isActive()) {System.out.println("发送Ping:"+channel.id().asLongText());channel.pipeline().writeAndFlush(new Ping());timer.newTimeout(this, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);}}public void onHeartbeatReceived() {lastHeartbeatTime = System.currentTimeMillis();}}
}
查看代码

ServerMessageHandler

   //连接建立后,塞入ConnectionManager 
   @Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);Container.getBean(ConnectionManager.class).addConnection(ctx.channel().id().asLongText(), ctx);}//连接断开后,从ConnectionManager移除//这里是TCP能检测到的断连。
    @Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);Container.getBean(ConnectionManager.class).removeConnection(ctx.channel().id().asLongText());}

ClientMessageHandler

@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if(msg instanceof BaseResponse) {ClientApi clientApi = Container.getBean(ClientApi.class);if(Objects.nonNull(clientApi)) {clientApi.onResponse((BaseResponse) msg);}} else if(msg instanceof Ping) {//立即返回Pong包ctx.writeAndFlush(new Pong());} else {System.out.println("未知消息:"+msg);}}

其中Ping-Pong都是空对象,编码成报文的时候,只有消息头,也就是只有4个字节。

基础建设——Handler线程池

在现行的框架中,NIO网络线程都是只负责数据包的解析工作,其他的业务逻辑处理则交由独立的线程池进行。

为什么这样做呢?

这种设计的核心目的是避免阻塞NIO线程。如果将数据包的解析和后续的业务处理都放在NIO线程中进行,肯会导致该线程忙于处理当前的数据包,从而无法及时处理其他客户端的请求。这不仅会影响系统的响应性能,还可能导致Socket阻塞或线程饥饿,最终导致系统吞吐量下降,响应延迟增大。

处理模型

所以我们可以将解析出来的请求,放到一个请求队列,然后由线程池去异步处理。

RequestChannel

只有一个ArrayBlockingQueue队列

public class RequestChannel {private ArrayBlockingQueue<WrapRequest> queue = new ArrayBlockingQueue<>(50);public void addRequest(WrapRequest helloRequest) {try {queue.put(helloRequest);} catch (InterruptedException e) {throw new RuntimeException("写入队列失败");}}public WrapRequest getRequest(long timeout, TimeUnit timeUnit) throws InterruptedException {return queue.poll(timeout, timeUnit);}
}
View Code

RequestHandlerPool

多个处理线程并发从请求队列中拉取请求,然后根据API选择处理函数进行处理。

这里处理函数直接写在RequestHandlerPool,后续可以改成注册式的。一个API一个处理函数。

public class RequestHandlerPool {private final int threadSize;private List<Thread> workerThreads;private RequestChannel requestChannel;private ConnectionManager connectionManager;public RequestHandlerPool(int threadSize) {this.threadSize = threadSize;}public void start() {requestChannel = Container.getBean(RequestChannel.class);connectionManager = Container.getBean(ConnectionManager.class);workerThreads = new ArrayList<>(threadSize);for (int i = 0; i < threadSize; i++) {Thread worker = new Thread(this::pollAndHandle);worker.start();workerThreads.add(worker);}}public void pollAndHandle() {while(!Thread.interrupted()) {try {WrapRequest request = null;try {request = requestChannel.getRequest(5, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}if(Objects.isNull(request)) {continue;}ChannelHandlerContext ctx = connectionManager.getConnection(request.getChannelId());if(Objects.isNull(ctx)) {continue;}BaseRequest baseRequest = request.getRequest();int apiKey = baseRequest.getApi();switch (apiKey) {case ApiKeys.HELLO: handleHelloRequest(baseRequest, ctx);break;case ApiKeys.DOWNLOAD: handleDownloadRequest(baseRequest, ctx);break;default: break;}} catch (Exception e) {System.out.println("处理请求失败"+e);e.printStackTrace();}}}public void shutdown() {for (Thread workerThread : workerThreads) {workerThread.interrupt();}}
}
View Code

ServerMessageHandler

收到解析好的请求,包装后塞入队列

    @Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if(msg instanceof BaseRequest) {System.out.println("收到消息:"+msg);BaseRequest baseRequest = (BaseRequest) msg;//塞入队列//需要包装一下,增加字段说明来自哪个channel,响应的时候知道如何处理//TODO也可增加入队时间,开始处理时间,完成处理时间。用于判断是否调整线程数量,以及是否丢弃超时请求Container.getBean(RequestChannel.class).addRequest(new WrapRequest(ctx.channel().id().asLongText(), baseRequest));} else if(msg instanceof Pong) {//交由连接管理器处理Container.getBean(ConnectionManager.class).onHeartbeatResponse(ctx.channel().id().asLongText());}}

案例——Hello请求

1)客户端

public class Test {public static void main(String[] args) {ClientApi clientApi = new ClientApi("127.0.0.1", 9090);for (int i = 0; i < 5; i++) {try {BaseRequest request = buildHelloRequest(1000+i);//同步请求,收到响应后才会发下一个请求
//                BaseResponse response = clientApi.sendRequest(request);
//                System.out.println("同步收到:" + response.getMsgId() + "\n解析内容:" + HelloResponseData.parseFrom(response.getData().toByteArray()));//异步请求,发送完成即可发送下一个请求clientApi.sendRequestAsync(request, response2 -> {try {System.out.println("异步收到:"+response2.getMsgId() + "\n解析内容:" + HelloResponseData.parseFrom(response2.getData().toByteArray()));} catch (InvalidProtocolBufferException e) {e.printStackTrace();}return true;});} catch (Exception e) {e.printStackTrace();}}}//构建请求public static BaseRequest buildHelloRequest(int id) {int randomStrLength = RandomUtil.randomInt(100, 200);String content = RandomUtil.randomString(randomStrLength);//构建请求内容protoHelloRequestData data = HelloRequestData.newBuilder().setContent(content).build();//构建请求proto,指定API,塞入请求内容BaseRequest request = BaseRequest.newBuilder().setApi(ApiKeys.HELLO).setVersion(1).setMsgId(id).setData(data.toByteString()).build();return request;}
}
View Code

2)服务端

    public void handleHelloRequest(BaseRequest baseRequest, ChannelHandlerContext ctx) throws Exception {byte[] data = baseRequest.getData().toByteArray();//解析请求内容protoHelloRequestData helloRequestData = HelloRequestData.parseFrom(data);String content = helloRequestData.getContent();//构建响应内容protoHelloResponseData helloResponseData = HelloResponseData.newBuilder().setCode(200).setContent("Handled:"+content).build();//构建响应proto,塞入响应内容BaseResponse response = BaseResponse.newBuilder().setMsgId(baseRequest.getMsgId()).setData(helloResponseData.toByteString()).build();ctx.writeAndFlush(response);}
View Code

客户端输出:

开始连接
发出消息:api: 1
version: 1
msgId: 1000
data: "\n\203\001orL4AaLqgytpVNPc1fmhGV5TpSw8z41FdsqUzG1tpXloxWkwpxFh9gf8UF51vysakK0KrJd9K6R7BJGIC7ha9UIjA1VjaaNmJ6Crx5zhPZHuCUM4byAiS6XDpnXDU2XCVxe"异步收到:1000
解析内容:code: 200
content: "Handled:orL4AaLqgytpVNPc1fmhGV5TpSw8z41FdsqUzG1tpXloxWkwpxFh9gf8UF51vysakK0KrJd9K6R7BJGIC7ha9UIjA1VjaaNmJ6Crx5zhPZHuCUM4byAiS6XDpnXDU2XCVxe"发出消息:api: 1
version: 1
msgId: 1001
data: "\n\230\001Wsq1EGAZgE9NmHFSOdRUloqshSeA0yx6ZUspKrsO6PGF4fN3oNk5CDzjsUKYFCMfCFBjHkdjPpnSHPRNUSqS1kexFKAs06UQ9v9J28sK7alUCigx0AtoyE1Cxa4bPYoaqixkcG0sCfShIK8zglWAgRma"发出消息:api: 1
version: 1
msgId: 1002
data: "\n\247\00187Eh4Cw8FE29xBOWnR3RIH2efvMxT2LzMkCvRHxthStqcfspRZLsEYfX5q9YHBzgCswX0ManKbQXxticCnArmuhPX9qgP8epuisGnnrPzVO7FbyowkWPFu5vLbgW2w5QIr4rdIVmRCQAQliLyS56EtEaRn10AB4wEDpgJWL"发出消息:api: 1
version: 1
msgId: 1003
data: "\n\203\001sWCpHDYZpReZvOypclgQz6PQs901kbCPyzkhSEeS6nwGHQ7tO5Lzl9pT2V5xm8Vb89G9oy5K7OWlrdrAPag5JZ9dZY40WEgvbc5W0UFgIqKeS95dHqeWT8K1ifUzpy23dLQ"异步收到:1002
解析内容:code: 200
content: "Handled:87Eh4Cw8FE29xBOWnR3RIH2efvMxT2LzMkCvRHxthStqcfspRZLsEYfX5q9YHBzgCswX0ManKbQXxticCnArmuhPX9qgP8epuisGnnrPzVO7FbyowkWPFu5vLbgW2w5QIr4rdIVmRCQAQliLyS56EtEaRn10AB4wEDpgJWL"异步收到:1001
解析内容:code: 200
content: "Handled:Wsq1EGAZgE9NmHFSOdRUloqshSeA0yx6ZUspKrsO6PGF4fN3oNk5CDzjsUKYFCMfCFBjHkdjPpnSHPRNUSqS1kexFKAs06UQ9v9J28sK7alUCigx0AtoyE1Cxa4bPYoaqixkcG0sCfShIK8zglWAgRma"发出消息:api: 1
version: 1
msgId: 1004
data: "\n\300\001EUTkCNd5PV7IMzvTlTKLoM65CMhYjKo4r9jAodOugWfvudBEIxHJnlDed3MwpiyYxzmnDkoUdJY1r2pe8BU97iprzuDpyuPQp80Ds8BkccGZP2nBllIR28epbY1Du3ZoYV552hGKucpSwysqgSFVfc7hmEuo4iKsaJ9yl807l91hr6jqWc7PGZ4iime6Xzpo"异步收到:1003
解析内容:code: 200
content: "Handled:sWCpHDYZpReZvOypclgQz6PQs901kbCPyzkhSEeS6nwGHQ7tO5Lzl9pT2V5xm8Vb89G9oy5K7OWlrdrAPag5JZ9dZY40WEgvbc5W0UFgIqKeS95dHqeWT8K1ifUzpy23dLQ"异步收到:1004
解析内容:code: 200
content: "Handled:EUTkCNd5PV7IMzvTlTKLoM65CMhYjKo4r9jAodOugWfvudBEIxHJnlDed3MwpiyYxzmnDkoUdJY1r2pe8BU97iprzuDpyuPQp80Ds8BkccGZP2nBllIR28epbY1Du3ZoYV552hGKucpSwysqgSFVfc7hmEuo4iKsaJ9yl807l91hr6jqWc7PGZ4iime6Xzpo"

基于此协议实现文件下载——零拷贝

//TBD

基于此协议实现Raft选主

//TBD

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/876472.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android Qcom board-id加载镜像学习

很早就听说过board-id能用来区分项目,没负责过这个,也一直没有时间去了解。board-id的可以通过gpio或者eeprom来存放,board-id也就是CDT中的部分内容,如果时gpio的方式,可配置的项目有些而且在主板上的都是hardcode,这样不利于维护。 XBL-CDT default: BOOT.XF.4.1/boot_i…

阿里云2025年免费领取300元无门槛优惠券

综合传送门详情免责声明 版权声明 交流群 公众hao服务器有什么用 服务器可以用于管理网络资源,比如控制网络访问、发送/接收电子邮件和 托管网站。服务器用于网站和大型数据库等应用,具有高速计算能力、长期可靠运行、强大的数据吞吐量、高可用性、可靠性、可扩展性和可管理性…

美团面试:MySQL为什么 不用 Docker部署?

本文原文链接 文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录 博客园版 为您奉上珍贵的学习资源 : 免费赠送 :《尼恩Java面试宝典》 持续更新+ 史上最全 + 面试必备 2000页+ 面试必备 + 大厂必备 +涨薪必备 免费赠送 :《尼恩技术圣经+高并发系列PDF》 ,帮你 …

Java 异常

目录异常介绍异常概念异常体系异常分类异常的产生过程解析异常的处理抛出异常 throwObjects 非空判断声明异常 throws捕获异常 try…catchfinally 代码块异常注意事项自定义异常概述例 异常介绍 异常概念 异常,就是不正常的意思。在生活中:医生说,你的身体某个部位有异常,该部…

macOS Sequoia 15.3 (24D60) Boot ISO 原版可引导镜像下载

macOS Sequoia 15.3 (24D60) Boot ISO 原版可引导镜像下载macOS Sequoia 15.3 (24D60) Boot ISO 原版可引导镜像下载 iPhone 镜像、Safari 浏览器重大更新和 Apple Intelligence 等众多全新功能令 Mac 使用体验再升级 请访问原文链接:https://sysin.org/blog/macOS-Sequoia-bo…

新春“码”启 | Cocos 3D 开发微信小游戏(第3天):场景搭建与游戏链路基础开发

新春开发 Cocos 3D 微信小游戏计划的第3天,包括总体设计方案,包括关卡模式、时间限制、复活机制等。接着详细展示基础框架研发,如开始场景和游戏场景(关卡一)的开发,包括创建场景、画布、立方体、材质,以及按钮的功能实现和场景切换等……今天是实施新春小游戏计划的第 …

macOS Sonoma 14.7.3 (23H417) 正式版 ISO、IPSW、PKG 下载

macOS Sonoma 14.7.3 (23H417) 正式版 ISO、IPSW、PKG 下载macOS Sonoma 14.7.3 (23H417) 正式版 ISO、IPSW、PKG 下载 利用小组件进行个性化设置、令人眼前一亮的全新屏幕保护、Safari 浏览器和视频会议的重大更新 请访问原文链接:https://sysin.org/blog/macOS-Sonoma/ 查看…

macOS Sequoia 15.3 (24D60) 正式版 ISO、IPSW、PKG 下载

macOS Sequoia 15.3 (24D60) 正式版 ISO、IPSW、PKG 下载macOS Sequoia 15.3 (24D60) 正式版 ISO、IPSW、PKG 下载 iPhone 镜像、Safari 浏览器重大更新和 Apple Intelligence 等众多全新功能令 Mac 使用体验再升级 请访问原文链接:https://sysin.org/blog/macOS-Sequoia/ 查看…

macOS Sonoma 14.7.3 (23H417) Boot ISO 原版可引导镜像下载

macOS Sonoma 14.7.3 (23H417) Boot ISO 原版可引导镜像下载macOS Sonoma 14.7.3 (23H417) Boot ISO 原版可引导镜像下载 本站下载的 macOS 软件包,既可以拖拽到 Applications(应用程序)下直接安装,也可以制作启动 U 盘安装,或者在虚拟机中启动安装。另外也支持在 Windows…

CF750C

New Year and Rating 题目链接 题目样例 输入 3 -7 1 5 2 8 2输出 1907思路 二分 二分rating,从1到n遍历,若碰到不满足条件的:1却max小于1900,2却min大于1899,则直接返回,修改mid的区间重新取 若满足条件,则直接加减所给值 模拟 和上面一样,若不满足直接输出,满足则一…