前言
上一篇随笔"如何实现一套自定义网络协议",介绍了自定义协议的粘拆包的处理,消息体的编解码等。
其中包含一个案例,演示怎么实现一个RPC实现。
不过案例中的Request格式是固定的,实际应用中,RPC协议需要支持不同的API。
所以需要有一个字段来表示API类型,而字段的存放位置有两种选择
1)消息头的第一个字节
2)消息体
一个字节能表示值0~255。如果使用这个字节来表示请求类型,可能不够用。
所以本文具体的API定义在消息体中。
协议回顾
数据包=消息头(4字节)+消息体(n字节)
消息头=类型(1字节)+消息体长度(3字节)
类型值:
- 1 => 请求
- 2 => 响应
- 3 => Ping
- 4 => Pong
协议补充
以下格式设计针对“请求-响应”的消息体
1)请求
message Header {required string key = 1;required string value = 2; }message BaseRequest {required int32 api = 1;required int32 version = 2;required int32 msgId = 3;repeated Header headers = 4;optional bytes data = 5; }message HelloRequestData {required string content = 1; }
固定部分
- api:请求类型,int32
- version:请求版本,int32
- msgId:请求ID,int32
可变Header
- headers:key-value
可变Data
- bytes
解析方式:
1.处理粘拆包,得到完整消息体
2.使用protobuf解析消息体,读取消息体中的api字段
3.根据api字段知道请求类型,使用对应的protobuf解析data,得到请求payload对象
2)响应
message BaseResponse {required int32 msgId = 1;repeated Header headers = 2;optional bytes data = 3; }message HelloResponseData {required int32 code = 1;required string content = 2; }
固定部分
- msgId: 请求ID,int32
可变Header部分
- headers: key-value
可变Data
- bytes
处理方式:
1.构建响应Data
2.构建BaseResponse
3.将Data转成ByteString塞入BaseResponse
基础建设——连接管理
连接管理包含两部分:
- 存储Socket关联关系
- 使用Map存储每个Socket的关联关系,便于后续引用。
- Socket心跳检查
- 通过Ping-Pong机制,定期检查客户端的连接状态,及时识别无响应的客户端并释放服务端资源。
释放什么资源?
socket需要占用服务端的内存资源(发送和接收缓冲区),还有文件描述符
客户端什么时候会出现无响应?
- 客户端进程卡死,例如死锁或CPU饱和。
- 客户端意外断开(如断电,系统崩溃),无法发送TCP的FIN包来关闭连接。服务端无法感知到连接关闭
ConnectionManager
public class ConnectionManager {private static final Timer timer = new HashedWheelTimer();private static final long HEARTBEAT_INTERVAL = 10; // 10秒private static final long HEARTBEAT_TIMEOUT = 30; // 30秒超时private final Map<String, ChannelHandlerContext> connectionMap = new ConcurrentHashMap<>();private final Map<String, HeartBeatTask> taskMap = new ConcurrentHashMap<>();public void addConnection(String id, ChannelHandlerContext ctx) {connectionMap.put(id, ctx);//创建心跳定时任务HeartBeatTask heartBeatTask = new HeartBeatTask(ctx.channel());timer.newTimeout(heartBeatTask, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);taskMap.put(id, heartBeatTask);}public ChannelHandlerContext getConnection(String id) {return connectionMap.get(id);}public void removeConnection(String id) {System.out.println("断开连接:"+id);connectionMap.remove(id);taskMap.remove(id);}public void onHeartbeatResponse(String id) {System.out.println("收到Pong:"+id);HeartBeatTask task = taskMap.get(id);if(Objects.nonNull(task)) {task.onHeartbeatReceived();}}public void clear() {connectionMap.clear();}public class HeartBeatTask implements TimerTask {private final Channel channel;private long lastHeartbeatTime;public HeartBeatTask(Channel channel) {this.channel = channel;this.lastHeartbeatTime = System.currentTimeMillis();}@Overridepublic void run(Timeout timeout) throws Exception {long now = System.currentTimeMillis();if(now - lastHeartbeatTime > HEARTBEAT_TIMEOUT * 1000) {System.out.println("Heartbeat timeout, disconnecting channel " + channel.id().asLongText());channel.close();return;}sendHeartbeat();}private void sendHeartbeat() {if(channel.isActive()) {System.out.println("发送Ping:"+channel.id().asLongText());channel.pipeline().writeAndFlush(new Ping());timer.newTimeout(this, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);}}public void onHeartbeatReceived() {lastHeartbeatTime = System.currentTimeMillis();}} }
ServerMessageHandler
//连接建立后,塞入ConnectionManager @Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);Container.getBean(ConnectionManager.class).addConnection(ctx.channel().id().asLongText(), ctx);}//连接断开后,从ConnectionManager移除//这里是TCP能检测到的断连。 @Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);Container.getBean(ConnectionManager.class).removeConnection(ctx.channel().id().asLongText());}
ClientMessageHandler
@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if(msg instanceof BaseResponse) {ClientApi clientApi = Container.getBean(ClientApi.class);if(Objects.nonNull(clientApi)) {clientApi.onResponse((BaseResponse) msg);}} else if(msg instanceof Ping) {//立即返回Pong包ctx.writeAndFlush(new Pong());} else {System.out.println("未知消息:"+msg);}}
其中Ping-Pong都是空对象,编码成报文的时候,只有消息头,也就是只有4个字节。
基础建设——Handler线程池
在现行的框架中,NIO网络线程都是只负责数据包的解析工作,其他的业务逻辑处理则交由独立的线程池进行。
为什么这样做呢?
这种设计的核心目的是避免阻塞NIO线程。如果将数据包的解析和后续的业务处理都放在NIO线程中进行,肯会导致该线程忙于处理当前的数据包,从而无法及时处理其他客户端的请求。这不仅会影响系统的响应性能,还可能导致Socket阻塞或线程饥饿,最终导致系统吞吐量下降,响应延迟增大。
处理模型
所以我们可以将解析出来的请求,放到一个请求队列,然后由线程池去异步处理。
RequestChannel
只有一个ArrayBlockingQueue队列
public class RequestChannel {private ArrayBlockingQueue<WrapRequest> queue = new ArrayBlockingQueue<>(50);public void addRequest(WrapRequest helloRequest) {try {queue.put(helloRequest);} catch (InterruptedException e) {throw new RuntimeException("写入队列失败");}}public WrapRequest getRequest(long timeout, TimeUnit timeUnit) throws InterruptedException {return queue.poll(timeout, timeUnit);} }
RequestHandlerPool
多个处理线程并发从请求队列中拉取请求,然后根据API选择处理函数进行处理。
这里处理函数直接写在RequestHandlerPool,后续可以改成注册式的。一个API一个处理函数。
public class RequestHandlerPool {private final int threadSize;private List<Thread> workerThreads;private RequestChannel requestChannel;private ConnectionManager connectionManager;public RequestHandlerPool(int threadSize) {this.threadSize = threadSize;}public void start() {requestChannel = Container.getBean(RequestChannel.class);connectionManager = Container.getBean(ConnectionManager.class);workerThreads = new ArrayList<>(threadSize);for (int i = 0; i < threadSize; i++) {Thread worker = new Thread(this::pollAndHandle);worker.start();workerThreads.add(worker);}}public void pollAndHandle() {while(!Thread.interrupted()) {try {WrapRequest request = null;try {request = requestChannel.getRequest(5, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}if(Objects.isNull(request)) {continue;}ChannelHandlerContext ctx = connectionManager.getConnection(request.getChannelId());if(Objects.isNull(ctx)) {continue;}BaseRequest baseRequest = request.getRequest();int apiKey = baseRequest.getApi();switch (apiKey) {case ApiKeys.HELLO: handleHelloRequest(baseRequest, ctx);break;case ApiKeys.DOWNLOAD: handleDownloadRequest(baseRequest, ctx);break;default: break;}} catch (Exception e) {System.out.println("处理请求失败"+e);e.printStackTrace();}}}public void shutdown() {for (Thread workerThread : workerThreads) {workerThread.interrupt();}} }
ServerMessageHandler
收到解析好的请求,包装后塞入队列
@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if(msg instanceof BaseRequest) {System.out.println("收到消息:"+msg);BaseRequest baseRequest = (BaseRequest) msg;//塞入队列//需要包装一下,增加字段说明来自哪个channel,响应的时候知道如何处理//TODO也可增加入队时间,开始处理时间,完成处理时间。用于判断是否调整线程数量,以及是否丢弃超时请求Container.getBean(RequestChannel.class).addRequest(new WrapRequest(ctx.channel().id().asLongText(), baseRequest));} else if(msg instanceof Pong) {//交由连接管理器处理Container.getBean(ConnectionManager.class).onHeartbeatResponse(ctx.channel().id().asLongText());}}
案例——Hello请求
1)客户端
public class Test {public static void main(String[] args) {ClientApi clientApi = new ClientApi("127.0.0.1", 9090);for (int i = 0; i < 5; i++) {try {BaseRequest request = buildHelloRequest(1000+i);//同步请求,收到响应后才会发下一个请求 // BaseResponse response = clientApi.sendRequest(request); // System.out.println("同步收到:" + response.getMsgId() + "\n解析内容:" + HelloResponseData.parseFrom(response.getData().toByteArray()));//异步请求,发送完成即可发送下一个请求clientApi.sendRequestAsync(request, response2 -> {try {System.out.println("异步收到:"+response2.getMsgId() + "\n解析内容:" + HelloResponseData.parseFrom(response2.getData().toByteArray()));} catch (InvalidProtocolBufferException e) {e.printStackTrace();}return true;});} catch (Exception e) {e.printStackTrace();}}}//构建请求public static BaseRequest buildHelloRequest(int id) {int randomStrLength = RandomUtil.randomInt(100, 200);String content = RandomUtil.randomString(randomStrLength);//构建请求内容protoHelloRequestData data = HelloRequestData.newBuilder().setContent(content).build();//构建请求proto,指定API,塞入请求内容BaseRequest request = BaseRequest.newBuilder().setApi(ApiKeys.HELLO).setVersion(1).setMsgId(id).setData(data.toByteString()).build();return request;} }
2)服务端
public void handleHelloRequest(BaseRequest baseRequest, ChannelHandlerContext ctx) throws Exception {byte[] data = baseRequest.getData().toByteArray();//解析请求内容protoHelloRequestData helloRequestData = HelloRequestData.parseFrom(data);String content = helloRequestData.getContent();//构建响应内容protoHelloResponseData helloResponseData = HelloResponseData.newBuilder().setCode(200).setContent("Handled:"+content).build();//构建响应proto,塞入响应内容BaseResponse response = BaseResponse.newBuilder().setMsgId(baseRequest.getMsgId()).setData(helloResponseData.toByteString()).build();ctx.writeAndFlush(response);}
客户端输出:
开始连接 发出消息:api: 1 version: 1 msgId: 1000 data: "\n\203\001orL4AaLqgytpVNPc1fmhGV5TpSw8z41FdsqUzG1tpXloxWkwpxFh9gf8UF51vysakK0KrJd9K6R7BJGIC7ha9UIjA1VjaaNmJ6Crx5zhPZHuCUM4byAiS6XDpnXDU2XCVxe"异步收到:1000 解析内容:code: 200 content: "Handled:orL4AaLqgytpVNPc1fmhGV5TpSw8z41FdsqUzG1tpXloxWkwpxFh9gf8UF51vysakK0KrJd9K6R7BJGIC7ha9UIjA1VjaaNmJ6Crx5zhPZHuCUM4byAiS6XDpnXDU2XCVxe"发出消息:api: 1 version: 1 msgId: 1001 data: "\n\230\001Wsq1EGAZgE9NmHFSOdRUloqshSeA0yx6ZUspKrsO6PGF4fN3oNk5CDzjsUKYFCMfCFBjHkdjPpnSHPRNUSqS1kexFKAs06UQ9v9J28sK7alUCigx0AtoyE1Cxa4bPYoaqixkcG0sCfShIK8zglWAgRma"发出消息:api: 1 version: 1 msgId: 1002 data: "\n\247\00187Eh4Cw8FE29xBOWnR3RIH2efvMxT2LzMkCvRHxthStqcfspRZLsEYfX5q9YHBzgCswX0ManKbQXxticCnArmuhPX9qgP8epuisGnnrPzVO7FbyowkWPFu5vLbgW2w5QIr4rdIVmRCQAQliLyS56EtEaRn10AB4wEDpgJWL"发出消息:api: 1 version: 1 msgId: 1003 data: "\n\203\001sWCpHDYZpReZvOypclgQz6PQs901kbCPyzkhSEeS6nwGHQ7tO5Lzl9pT2V5xm8Vb89G9oy5K7OWlrdrAPag5JZ9dZY40WEgvbc5W0UFgIqKeS95dHqeWT8K1ifUzpy23dLQ"异步收到:1002 解析内容:code: 200 content: "Handled:87Eh4Cw8FE29xBOWnR3RIH2efvMxT2LzMkCvRHxthStqcfspRZLsEYfX5q9YHBzgCswX0ManKbQXxticCnArmuhPX9qgP8epuisGnnrPzVO7FbyowkWPFu5vLbgW2w5QIr4rdIVmRCQAQliLyS56EtEaRn10AB4wEDpgJWL"异步收到:1001 解析内容:code: 200 content: "Handled:Wsq1EGAZgE9NmHFSOdRUloqshSeA0yx6ZUspKrsO6PGF4fN3oNk5CDzjsUKYFCMfCFBjHkdjPpnSHPRNUSqS1kexFKAs06UQ9v9J28sK7alUCigx0AtoyE1Cxa4bPYoaqixkcG0sCfShIK8zglWAgRma"发出消息:api: 1 version: 1 msgId: 1004 data: "\n\300\001EUTkCNd5PV7IMzvTlTKLoM65CMhYjKo4r9jAodOugWfvudBEIxHJnlDed3MwpiyYxzmnDkoUdJY1r2pe8BU97iprzuDpyuPQp80Ds8BkccGZP2nBllIR28epbY1Du3ZoYV552hGKucpSwysqgSFVfc7hmEuo4iKsaJ9yl807l91hr6jqWc7PGZ4iime6Xzpo"异步收到:1003 解析内容:code: 200 content: "Handled:sWCpHDYZpReZvOypclgQz6PQs901kbCPyzkhSEeS6nwGHQ7tO5Lzl9pT2V5xm8Vb89G9oy5K7OWlrdrAPag5JZ9dZY40WEgvbc5W0UFgIqKeS95dHqeWT8K1ifUzpy23dLQ"异步收到:1004 解析内容:code: 200 content: "Handled:EUTkCNd5PV7IMzvTlTKLoM65CMhYjKo4r9jAodOugWfvudBEIxHJnlDed3MwpiyYxzmnDkoUdJY1r2pe8BU97iprzuDpyuPQp80Ds8BkccGZP2nBllIR28epbY1Du3ZoYV552hGKucpSwysqgSFVfc7hmEuo4iKsaJ9yl807l91hr6jqWc7PGZ4iime6Xzpo"
基于此协议实现文件下载——零拷贝
//TBD
基于此协议实现Raft选主
//TBD