【编码】自定义通信协议——实现零拷贝文件传输

news/2025/1/31 23:20:26/文章来源:https://www.cnblogs.com/longfurcat/p/18695795

前言

上一篇随笔,介绍了如何扩展自定义协议的请求类型。本篇随笔我将介绍如何基于这个自定义协议来实现文件传输,其中将涉及数据分片零拷贝

在设计自定义协议之前,我们首先了解一下HTTP协议是如何处理文件传输的。

HTTP协议的实现方式

在这里,我们主要讨论应用最广泛的HTTP/1.1协议

关于数据分片

HTTP 协议本身是一个纯文本协议,其中的 Content-Length 头部字段用于指定响应体(body)的内容长度。Content-Length 是纯文本格式,理论上没有长度限制,因此在大多数情况下,HTTP 协议可以一次性传输整个文件。

对于较大的文件,通常情况下,可以通过一个请求下载整个文件,这也是许多网站和服务的常见做法。但如果文件特别大,或者为了提高下载效率(例如支持断点续传、并行下载等),就需要在应用层处理文件的分片。例如,服务端可以先返回文件的分段信息,然后客户端逐个请求文件的不同部分。

关于零拷贝

HTTP 协议的客户端库通常不暴露底层的 socket 连接,导致上层应用无法直接操作 socket 进行零拷贝传输。

大多数情况下,数据需要先被拷贝到进程的内存中,再传输给 HTTP 客户端。

由于 HTTP 客户端库的限制,零拷贝技术在 HTTP 协议的应用中并不直接适用。

自定义协议

关于数据分片

在自定义协议中,我们可以更灵活地控制传输过程。例如,我们只使用 3 个字节来表示消息体的长度,因此协议的最大传输内容为 16MB(2^24 - 1 字节)。

对于超出该限制的内容,我们必须进行分块处理,确保每个数据块都符合协议的长度限制。

关于零拷贝

自定义协议可以引用到Socket,所以可以使用零拷贝,避免数据在内存和磁盘之间的多次拷贝,从而提高传输效率,减少 CPU 的负载。

初步设计

如何构建数据包?

消息体是一个完整的ProtoBuf BaseResponse消息

  • msgId:请求ID
  • headers:文件名+文件大小+分块数量+分块号
  • bytes:文件分块数据
message BaseResponse {required int32 msgId = 1;repeated Header headers = 2;optional bytes data = 3;
}

消息体分两部分发送

1.先发送元数据(BaseResponse的msgId+headers)

2.后发送文件数据

服务端:

1.截取文件范围得到chunkSize

2.构建BaseResponse(仅包含msgId和headers)

3.计算得到消息体Length = BaseResponse的大小+chunkSize

4.发出消息头

5.发出BaseResponse

6.零拷贝发出文件chunk

客户端:

1.将消息体作为一个完整的BaseResponse进行解析。

冲突?ProtoBuf与零拷贝

在处理过程中,我们会遇到一个问题:ProtoBuf 的解析过程需要特定的编码格式,拼接进去的文件内容无法直接作为 ProtoBuf 消息的一部分。

如果需要ProtoBuf能识别这个文件内容,则文件数据必须参与编码,要参与编码就得载入到进程内存中。这跟零拷贝是相悖的。

如何处理这个问题?

再加一个length!消息体分为三部分:

  • 2字节,作为proto消息的长度信息。(元数据字节数有限,2字节足够表示)
  • n字节,proto消息(msgId+headers)
  • n字节,文件chunk数据

处理逻辑

1)服务端代码
Java的零拷贝API是FileChannel.transferTo(long position, long count, WritableByteChannel)。
不过Netty的Channel不是WritableByteChannel的子类。要使用零拷贝,得用Netty提供的FileRegion。底层也是调用FileChannel的transferTo。

    public void handleDownloadRequest(BaseRequest baseRequest, ChannelHandlerContext ctx) throws Exception {File file = new File("F:\\redis.log");RandomAccessFile raf = new RandomAccessFile(file, "r");FileChannel fileChannel = null;long fileLength = raf.length();System.out.println("file length" + fileLength);long offset = 0;int chunkIndex = 0;int totalChunks = (int) Math.ceil((double) fileLength / MAX_CHUNK_SIZE);boolean firstPackage = true;while(offset < fileLength) {raf = new RandomAccessFile(file, "r");fileChannel = raf.getChannel();System.out.println("open:"+fileChannel.isOpen());//文件块大小long chunkSize = Math.min(MAX_CHUNK_SIZE, fileLength - offset);System.out.println("chunkSize:"+chunkSize);// 创建 FileRegion 来传输当前文件块FileRegion fileRegion = new DefaultFileRegion(fileChannel, offset, chunkSize);List<Header> headers = new ArrayList<>();if(firstPackage) {headers.add(Header.newBuilder().setKey("fileName").setValue(file.getName()).build());headers.add(Header.newBuilder().setKey("fileSize").setValue(String.valueOf(fileLength)).build());headers.add(Header.newBuilder().setKey("totalChunks").setValue(String.valueOf(totalChunks)).build());}headers.add(Header.newBuilder().setKey("chunkIndex").setValue(String.valueOf(chunkIndex)).build());//发送消息体的上半部分(msgId+headers)BaseResponse response = BaseResponse.newBuilder().setMsgId(baseRequest.getMsgId()).addAllHeaders(headers).build();byte[] payloadHeadBytes = response.toByteArray();long bodyLength = 2 + payloadHeadBytes.length + chunkSize; //两个字节byte[] lengthBytes = new byte[3];lengthBytes[0] = (byte) (bodyLength >> 16);lengthBytes[1] = (byte) (bodyLength >> 8);lengthBytes[2] = (byte) bodyLength;//protobuf长度long length2 = payloadHeadBytes.length;byte[] lengthBytes2 = new byte[2];lengthBytes2[0] = (byte) (length2 >> 8);lengthBytes2[1] = (byte) (length2);//发送消息头+消息体的上半部分ByteBuf byteBuf = Unpooled.copiedBuffer(new byte[]{5}, lengthBytes, lengthBytes2, payloadHeadBytes);ChannelFuture f1 = ctx.channel().writeAndFlush(byteBuf);f1.sync();
//            System.out.println("f1:"+f1.isSuccess());//零拷贝写出文件数据(文件内容无需进入用户区内存,直接拷贝到socket发送缓冲区)ChannelFuture f2 = ctx.writeAndFlush(fileRegion);f2.sync();
//            System.out.println("f2:"+f2.isSuccess());
firstPackage = false;// 更新偏移量offset += chunkSize;System.out.println("写出:"+bodyLength);raf.close();}}

2)客户端代码

public class DownloadManager {private Map<Integer, FileDownContext> waitingMap = new ConcurrentHashMap<>();public void addToMap(Integer msgId, CompletableFuture<String> waiter) {waitingMap.put(msgId, new FileDownContext(null, null, 0L, 0.0d, waiter));}public void onResponse(BaseResponse response) {
//        System.out.println("收到:"+response.getMsgId());Integer msgId = response.getMsgId();FileDownContext context = waitingMap.get(msgId);if(Objects.isNull(context)) {return;}//首包带有这两个信息for (Header header : response.getHeadersList()) {if(StrUtil.equals(header.getKey(), "fileName")) {context.setFileName(header.getValue());}if(StrUtil.equals(header.getKey(), "totalChunks")) {context.setTotalChunks(Long.parseLong(header.getValue()));}}//更新接收情况context.receivedChunks++;context.progress = (double)context.receivedChunks/context.totalChunks;try {//文件如果不存在,则创建Path filePath = Paths.get("F:\\clientDownload\\" + context.fileName);if(!Files.exists(filePath)) {Files.createFile(filePath);}//追加写入文件
            Files.write(filePath, response.getData().toByteArray(), StandardOpenOption.APPEND);} catch (IOException e) {e.printStackTrace();}//完成请求,释放Contextif(Objects.equals(context.receivedChunks, context.totalChunks)) {context.waiter.complete(context.fileName);waitingMap.remove(msgId);}}@Data@AllArgsConstructorclass FileDownContext {String fileName;Long totalChunks;Long receivedChunks;Double progress;CompletableFuture<String> waiter;}
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/877437.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

研发的护城河到底是什么?

0 你的问题,我知道! 和大厂朋友聊天,他感叹原来努力干活,做靠谱研发,积累职场经验,干下来,职业发展一般问题不大。而如今大厂“年轻化”,靠谱再不能为自己续航,企业似乎也不愿意持续为经验买单。 在这不确定时代,职业发展中有无硬通货? 更长远职业发展角度:要抓住机…

冶炼金属

暴力做法 #include<iostream> #include<vector> using namespace std;void solve() {int n; cin >> n;vector<int>a(n), b(n);for (int i = 0; i < n; i++){cin >> a[i] >> b[i];}for (int i = 1; i < 1e6; i++)//从小到大,找最小值…

昆明理工大学2025年硕士研究生调剂汇总表1月31日更新

这是今年昆明理工大学调剂信息,目前只更新了部分学院的部分专业,后续会持续更新。 【腾讯文档】昆明理工大学2025年硕士研究生调剂汇总表 https://docs.qq.com/sheet/DZERIbnpPb3JjeHFO

子串简写

二分法: 要求:pc2-pc1+1>=k 变形:i(pc2)-k+1>=pc1#include <iostream> #include <string> #include<vector> #define int long long using namespace std; void solve() {int k;cin >> k;char c1, c2;cin >> c1 >> c2;string s;…

傻瓜教程 一步一步把blazor项目发布到linux(debian12,nginx反向代理设置,net8,net9适用)

接触blazor有一段时间了,感觉非常好用,特别适合企业内部开发用。开发效率高,界面优美,重要得是会c#的朋友不用再去学习js等前端技术了,虽然平时也看得懂js,html,css,但要自己写还是需要从头去学习的,不想再浪费精力去学习,毕竟会的再多,杂而不精也没什么意义。而自己…

[Tools] 发布代码

我们已经将我们的代码开源到了 github 上面,但是如果是其他开发者想要使用我们的库,还需要去 github 上面手动下载下来,添加到他们的项目里面,这样是非常低效的一种方式。 npm 的出现解决了这个问题,npm 是前端领域非常出名的一个包的托管平台,提供了代码的托管和检索以及…

威海市,杨文召——老赖!!!

威海市,杨文召——老赖!!!

思科静态路由(包含小实验)

思科静态路由 路由:从源主机到目标主机的转发过程 路由器是根据路由表转发数据 路由表:路由器中维护的路由条目的集合 路由器根据路由表做路径选择 路由表的形成 直连网段 本地接口配置IP地址和子网掩码,端口开启状态,形成直连路由 非直连路由 不是本地接口配置IP地址和子网…

PKUWC2025 D2T1

其实是场上的想到的做法,但是当时被卡 corner case 了 QaQ。 注意到,我们其实可以 \(O(1)\) 次 query 求出 \(x\) 和 \(y\) 的距离。具体地,我们再找三个点,现在有 \(5\) 个点,\(10\) 个距离,而我们又可以 query \(10\) 次,正好可以解出两两距离。这里如果 \(n\le 4\) 特…

MATLAB程序测试

% Interference cancellation % 悦博特北京科技有限公司 lxdawn@163.com %clear all, close all, clctime = 0:0.1:10;r = sin(time*4*pi);% Random initialisation of the W weight and b biasR = length(time); % number of inputsS = R;% p parasite signalp = randn(size(r…

CF998

A link总共就三组,看一下每一组如果满足要让它是多少,看看最多有几个相同的,最多有几个相同的就满足这些,就可以满足这么多组。点击查看代码 #include<bits/stdc++.h>using namespace std;int a[10]; int c[5];void qwq(){cin >> a[1] >> a[2] >> …

计算机网络学习-HCIP-02

TCP、UDP TCP:可靠性高 适合对文件传输的完整性要求高,但是对延迟不敏感 电子邮件 游戏或者应用:更新客户端的时候,就是用TCP协议来更新 游戏更新完了,进去游戏开局了,这时候就用UDP协议 UDP:速度快 双十一、618,电商服务器如何顶住压力的? 使用多台服务器 cdn技术,不…