Netty Review - 底层零拷贝源码解析

文章目录

  • Pre
  • 概述
  • 源码解析
    • 入口索引
    • AbstractNioByteChannel.NioByteUnsafe#read
    • allocHandle.allocate(allocator)
  • 小结
  • 传统的零拷贝

在这里插入图片描述

在这里插入图片描述


Pre

Netty Review - 直接内存的应用及源码分析


概述

Netty 的零拷贝技术是通过优化数据传输过程中的数据复制操作,以降低系统的开销和提高性能。

其原理主要涉及以下几个方面:

  1. 使用直接内存: Netty 利用 Java NIO 中的 ByteBuffer.allocateDirect() 方法来分配直接内存,直接内存的特点是可以直接被操作系统所管理,不受 Java 堆内存大小的限制,而且可以直接与操作系统进行数据交互,避免了数据在 Java 堆内存和操作系统之间的拷贝。

  2. 文件传输零拷贝: 在进行文件传输时,Netty 可以通过操作系统提供的零拷贝技术,直接将文件内容从磁盘读取到内核缓冲区,然后通过 DMA(Direct Memory Access)技术将数据直接传输到网络通道,避免了数据在用户空间和内核空间之间的拷贝。

  3. 内存池: Netty 使用内存池来管理直接内存的分配和释放,避免了频繁地申请和释放内存的开销,提高了内存的重复利用率。

  4. CompositeByteBuf: Netty 提供了 CompositeByteBuf 类来实现多个 ByteBuf 的组合,可以将多个缓冲区的内容合并为一个逻辑上的缓冲区,避免了数据在多个缓冲区之间的拷贝。

  5. 传输过程中的零拷贝: 在网络传输过程中,Netty 利用零拷贝技术将数据从应用程序的缓冲区直接传输到操作系统的网络缓冲区,避免了数据在用户空间和内核空间之间的拷贝,同时可以利用 scatter/gather I/O 操作一次性传输多个缓冲区的数据。

通过以上方式,Netty 实现了数据传输过程中的零拷贝,大大提高了系统的性能和吞吐量,特别是在高并发、大数据量的网络应用场景下,可以显著地降低系统的资源消耗和延迟。

在这里插入图片描述


源码解析

入口索引

结合我们的Netty线程模型源码图 ,找到入口 。

在这里插入图片描述

AbstractNioByteChannel.NioByteUnsafe#read

这段代码是 Netty 中的 read() 方法实现,用于从通道中读取数据并触发相应的事件到 ChannelPipeline 中。

@Override
public final void read() {final ChannelConfig config = config();  // 获取通道配置信息if (shouldBreakReadReady(config)) {  // 判断是否应该中断读就绪操作clearReadPending();  // 清除读等待标志return;}final ChannelPipeline pipeline = pipeline();  // 获取通道的管道final ByteBufAllocator allocator = config.getAllocator();  // 获取分配器final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();  // 获取接收字节缓冲区分配句柄allocHandle.reset(config);  // 重置分配句柄状态ByteBuf byteBuf = null;  // 字节缓冲区boolean close = false;  // 是否关闭标志try {do {byteBuf = allocHandle.allocate(allocator);  // 分配字节缓冲区allocHandle.lastBytesRead(doReadBytes(byteBuf));  // 读取数据到缓冲区if (allocHandle.lastBytesRead() <= 0) {// 如果没有读取到数据// 释放缓冲区byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;  // 是否关闭标志if (close) {// 如果收到 EOF,表示没有数据可读了readPending = false;  // 清除读等待标志}break;}allocHandle.incMessagesRead(1);  // 增加读取消息数readPending = false;  // 清除读等待标志pipeline.fireChannelRead(byteBuf);  // 触发通道读事件到管道byteBuf = null;} while (allocHandle.continueReading());  // 继续读取数据,直到不再需要读取为止allocHandle.readComplete();  // 读操作完成pipeline.fireChannelReadComplete();  // 触发通道读完成事件到管道if (close) {closeOnRead(pipeline);  // 如果需要关闭通道,执行关闭操作}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);  // 处理读取异常} finally {// 检查是否有未处理的读等待操作// 这可能有两个原因:// 1. 用户在 channelRead(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()// 2. 用户在 channelReadComplete(...) 方法中调用了 Channel.read() 或 ChannelHandlerContext.read()// 详见 https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();  // 移除读操作}}
}

allocHandle.allocate(allocator)

在这里插入图片描述

@Override
public ByteBuf allocate(ByteBufAllocator alloc) {return alloc.ioBuffer(guess());
}

在给定的 ByteBufAllocator 上分配一个新的 ByteBuf 实例。

  • return alloc.ioBuffer(guess()): 使用给定的 ByteBufAllocator 对象调用 ioBuffer() 方法来分配一个新的 ByteBuf 实例。guess() 方法用于估算分配的字节数。

该方法的作用是在给定的 ByteBufAllocator 上分配一个新的 ByteBuf 实例,并返回分配的实例。


在这里插入图片描述

alloc.ioBuffer(guess())

@Override
public ByteBuf ioBuffer(int initialCapacity) {if (PlatformDependent.hasUnsafe()) { // 检查当前平台是否支持直接内存return directBuffer(initialCapacity); // 如果支持直接内存,则调用 directBuffer() 方法创建直接内存的 ByteBuf 实例}return heapBuffer(initialCapacity); // 如果不支持直接内存,则调用 heapBuffer() 方法创建堆内存的 ByteBuf 实例
}

该方法的作用是根据当前平台是否支持直接内存来选择合适的内存类型(堆内存或直接内存),并根据传入的初始容量参数创建相应类型的 ByteBuf 实例

PlatformDependent.hasUnsafe() ---- true

   @Override
public ByteBuf directBuffer(int initialCapacity) {return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY); // 调用重载方法 directBuffer(int initialCapacity, int maxCapacity),传入默认的最大容量值 DEFAULT_MAX_CAPACITY
}

directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);

@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {if (initialCapacity == 0 && maxCapacity == 0) { // 如果初始容量和最大容量都为0return emptyBuf; // 返回一个空的 ByteBuf 实例}validate(initialCapacity, maxCapacity); // 验证初始容量和最大容量的合法性return newDirectBuffer(initialCapacity, maxCapacity); // 创建一个新的直接内存的 ByteBuf 实例
}

在这里插入图片描述

newDirectBuffer(initialCapacity, maxCapacity)

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {// 获取当前线程的线程缓存PoolThreadCache cache = threadCache.get();// 获取直接内存池PoolArena<ByteBuffer> directArena = cache.directArena;final ByteBuf buf;if (directArena != null) { // 如果直接内存池可用// 从直接内存池中分配内存buf = directArena.allocate(cache, initialCapacity, maxCapacity);} else { // 如果直接内存池不可用// 使用平台相关的方式创建直接内存的 ByteBuf 实例buf = PlatformDependent.hasUnsafe() ?UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);}// 返回一个包装了泄漏感知器的 ByteBuf 实例return toLeakAwareBuffer(buf);
}

directArena.allocate(cache, initialCapacity, maxCapacity);

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {// 创建一个新的 PooledByteBuf 实例,其中 maxCapacity 为指定的最大容量PooledByteBuf<T> buf = newByteBuf(maxCapacity);// 使用指定的线程缓存和请求容量来分配内存给 ByteBufallocate(cache, buf, reqCapacity);// 返回分配的 ByteBufreturn buf;
}

这段代码实现了从线程缓存中分配内存给 ByteBuf,并返回分配的 ByteBuf 实例。


allocate(cache, buf, reqCapacity);

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {// 将请求容量规范化为标准容量final int normCapacity = normalizeCapacity(reqCapacity);// 如果容量小于页面大小,则分配小块或微型内存if (isTinyOrSmall(normCapacity)) {int tableIdx;PoolSubpage<T>[] table;boolean tiny = isTiny(normCapacity);if (tiny) { // < 512if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {// 从缓存中成功分配,则直接返回return;}tableIdx = tinyIdx(normCapacity);table = tinySubpagePools;} else {if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {// 从缓存中成功分配,则直接返回return;}tableIdx = smallIdx(normCapacity);table = smallSubpagePools;}final PoolSubpage<T> head = table[tableIdx];// 同步处理双向链表的头部synchronized (head) {final PoolSubpage<T> s = head.next;if (s != head) {// 分配内存assert s.doNotDestroy && s.elemSize == normCapacity;long handle = s.allocate();assert handle >= 0;s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);incTinySmallAllocation(tiny);return;}}// 没有可用的内存块,则进入分配普通内存块的逻辑synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);}incTinySmallAllocation(tiny);return;}// 大于页面大小的内存分配if (normCapacity <= chunkSize) {if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {// 从缓存中成功分配,则直接返回return;}synchronized (this) {allocateNormal(buf, reqCapacity, normCapacity);++allocationsNormal;}} else {// 大块内存分配allocateHuge(buf, reqCapacity);}
}

这段代码实现了根据请求的容量大小来分配不同大小的内存块,优先从缓存中分配,如果缓存中没有可用内存,则根据请求的大小分配不同大小的内存块。


小结

Netty的接收和发送ByteBuf采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。

如果使用传统的JVM堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才能写入Socket中。

JVM堆内存的数据是不能直接写入Socket中的。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。

在这里插入图片描述

在传统的Java IO中,使用堆内存(Heap Buffers)进行Socket读写时,数据需要先从堆内存中复制到直接内存(Direct Buffers),然后才能写入Socket。这样就导致了一次缓冲区的内存拷贝。

而在Netty中,采用堆外直接内存(Direct Buffers)进行Socket读写。这样,在数据传输过程中,就不需要进行额外的内存拷贝操作。消息可以直接从直接内存写入Socket中,从而避免了堆内存到直接内存的二次拷贝,提高了数据传输的效率。

使用堆外直接内存的优点包括:

  1. 减少了内存拷贝次数:消息可以直接从直接内存写入Socket中,避免了额外的内存拷贝操作,提高了数据传输的效率。
  2. 提高了IO性能:由于减少了内存拷贝操作,可以降低CPU的开销,提高IO性能。
  3. 更好地利用操作系统资源:堆外直接内存是由操作系统直接管理的,不受Java堆大小的限制,可以更好地利用操作系统的资源。

总的来说,Netty使用堆外直接内存进行Socket读写可以提高IO性能,并降低系统资源的开销,是一种更高效的IO模型。


传统的零拷贝

在这里插入图片描述

戳这里

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/472199.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

租赁香港服务器多少钱一个月?24元

阿里云香港服务器2核1G、30M带宽、40GB ESSD系统盘优惠价格24元/月&#xff0c;288元一年&#xff0c;每月流量1024GB&#xff0c;多配置可选&#xff0c;官方优惠活动入口 https://t.aliyun.com/U/bLynLC 阿里云服务器网aliyunfuwuqi.com分享阿里云香港服务器优惠活动、详细配…

嵌入式Qt Qt中的字符串类

一.Qt中的字符串类 QString vs string&#xff1a; QString在Qt库中几乎是无所不在的 所有的Qt图形用户组件都依赖于QString 实验1 &#xff1a;QString 初体验 #include <QDebug> void Sample_1() {QString s "add";s.append(" "); // &q…

Python 异常处理及程序调试

Python 是一门功能强大而又易于学习的编程语言&#xff0c;它提供了丰富的工具和库来帮助开发者编写高效、稳定的程序。然而&#xff0c;在编写复杂的应用程序时&#xff0c;错误和异常是难以避免的。本文将介绍 Python 中的异常处理机制以及程序调试技巧&#xff0c;帮助读者提…

开什么店最稳定轻松?适合一个人开的实体店推荐

在创业的道路上&#xff0c;很多人都希望找到一种稳定轻松的开店方式。 作为一名资深的鲜奶吧创业者&#xff0c;我将分享我的经验和见解&#xff0c;希望能给那些想开实体店的朋友们一些启示&#xff01;&#xff01; 我开鲜奶吧已经有 5 年时间了&#xff0c;目前经营的是鲜…

Chat with RTX 安装

1、Chat With RTX 介绍 Chat With RTX 是一个 Demo&#xff0c;用来将您自己的资料&#xff08;文档、笔记、视频或其他数据&#xff09;与大语言模型建立连接&#xff0c;从而令 LLM 更具个性化。利用检索增强生成 (RAG)、TensorRT-LLM 和 RTX 加速&#xff0c;您可以与自定义…

Java实现河南软件客服系统 JAVA+Vue+SpringBoot+MySQL

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 系统管理人员2.2 业务操作人员 三、系统展示四、核心代码4.1 查询客户4.2 新增客户跟进情况4.3 查询客户历史4.4 新增服务派单4.5 新增客户服务费 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的河…

2022年12月电子学会青少年软件编程 中小学生Python编程等级考试二级真题解析(选择题)

2022年12月Python编程等级考试二级真题解析 选择题&#xff08;共25题&#xff0c;每题2分&#xff0c;共50分&#xff09; 1、运行下列程序&#xff0c;最终输出的结果是 info {1:小明,2:小黄,3:小兰} info[4]小红 info[2]小白 print(info) A、{1:小明,2:小白,3:小红,4:小…

RapidMiner数据挖掘2 —— 初识RapidMiner

本节由一系列练习与问题组成&#xff0c;这些练习与问题有助于理解多个基本概念。它侧重于各种特定步骤&#xff0c;以进行直接的探索性数据分析。因此&#xff0c;其主要目标是测试一些检查初步数据特征的方法。大多数练习都是关于图表技术&#xff0c;通常用于数据挖掘。 为此…

2.14:二维数组、非函数实现strcat、strcmp、strcpy、strlen

1.编程实现二维数组的杨辉三角 程序代码&#xff1a; 1 #include<stdio.h>2 #include<string.h>3 #include<stdlib.h>4 int main(int argc, const char *argv[])5 {6 int n;7 printf("please enter n:");8 scanf("%d",&…

MySQL学习记录——십일 索引

文章目录 1、理解索引2、聚簇、非聚簇索引3、操作1、主键索引2、唯一键索引3、普通索引4、注意事项 4、全文索引 1、理解索引 MySQL服务器是在内存中的&#xff0c;所有数据库的CURD操作都是在内存中进行&#xff0c;索引也是如此。索引是用来提高性能的&#xff0c;它通过组织…

「计算机网络」数据链路层

数据链路层的地位&#xff1a;网络中的主机、路由器等都必须实现数据链路层信道类型 点对点信道&#xff1a;使用一对一的点对点通信方式广播信道 使用一对多的广播通信方式必须使用专用的共享信道协议来协调这些主机的数据发送 使用点对点信道的数据链路层 数据链路和帧 链…

java-8组合式异步编程

11.1 Future 接口 Future接口在Java5中被引人&#xff0c;设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算&#xff0c;返回一个执行运算结果的引用&#xff0c;当运算结束后&#xff0c;这个引用被返回给调用方。在Future中触发那些潜在耗时的操作把调用…