.NET 高性能缓冲队列实现 BufferQueue

news/2024/9/23 21:32:05/文章来源:https://www.cnblogs.com/eventhorizon/p/18331018

目录
  • 前言
  • 适用场景
  • 功能说明
  • 使用示例
  • BufferQueue 内部设计概述
    • Topic 的隔离
    • Partition 的设计
    • 对并发的支持
    • Partition 的动态扩容
    • Segment 的回收机制
  • Benchmark
    • 写入性能测试
    • 消费性能测试

前言

BufferQueue 是一个用 .NET 编写的高性能的缓冲队列实现,支持多线程并发操作。

项目是从 mocha 项目中独立出来的一个组件,经过修改以提供更通用的缓冲队列功能。

目前支持的缓冲区类型为内存缓冲区,后续会考虑支持更多类型的缓冲区。

适用场景

生产者和消费者之间的速度不一致,需要并发批量处理数据的场景。

功能说明

  1. 支持创建多个 Topic,每个 Topic 可以有多种数据类型。每一对 Topic 和数据类型对应一个独立的缓冲区。

BufferQueue

  1. 支持创建多个 Consumer Group,每个 Consumer Group 的消费进度都是独立的。支持多个 Consumer Group 并发消费同一个 Topic。

  2. 支持同一个 Consumer Group 创建多个 Consumer,以负载均衡的方式消费数据。

  3. 支持数据的批量消费,可以一次性获取多条数据。

  4. 支持 pull 模式和 push 模式两种消费模式。

  5. pull 模式下和 push 模式下都支持 auto commit 和 manual commit 两种提交方式。auto commit 模式下,消费者在收到数据后自动提交消费进度,如果消费失败不会重试。manual commit 模式下,消费者需要手动提交消费进度,如果消费失败只要不提交进度就可以重试。

需要注意的是,当前版本出于简化实现的考虑,暂不支持消费者的动态扩容和缩容,需要在创建消费者时指定消费者数量。

使用示例

安装 Nuget 包:

dotnet add package BufferQueue

项目基于 Microsoft.Extensions.DependencyInjection,使用时需要先注册服务。

BufferQueue 支持两种消费模式:pull 模式和 push 模式。


builder.Services.AddBufferQueue(options =>
{options.UseMemory(bufferOptions =>{// 每一对 Topic 和数据类型对应一个独立的缓冲区,可以设置 partitionNumberbufferOptions.AddTopic<Foo>("topic-foo1", partitionNumber: 6);bufferOptions.AddTopic<Foo>("topic-foo2", partitionNumber: 4);bufferOptions.AddTopic<Bar>("topic-bar", partitionNumber: 8);})// 添加 push 模式的消费者// 扫描指定程序集中的标记了 BufferPushCustomerAttribute 的类,// 注册为 push 模式的消费者.AddPushCustomers(typeof(Program).Assembly);
});// 在 HostedService 中使用 pull模式 消费数据
builder.Services.AddHostedService<Foo1PullConsumerHostService>();

pull 模式的消费者示例:

public class Foo1PullConsumerHostService(IBufferQueue bufferQueue,ILogger<Foo1PullConsumerHostService> logger) : IHostedService
{private readonly CancellationTokenSource _cancellationTokenSource = new();public Task StartAsync(CancellationToken cancellationToken){var token = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token).Token;var consumers = bufferQueue.CreatePullConsumers<Foo>(new BufferPullConsumerOptions{TopicName = "topic-foo1", GroupName = "group-foo1", AutoCommit = true, BatchSize = 100,}, consumerNumber: 4);foreach (var consumer in consumers){_ = ConsumeAsync(consumer, token);}return Task.CompletedTask;}public Task StopAsync(CancellationToken cancellationToken){_cancellationTokenSource.Cancel();return Task.CompletedTask;}private async Task ConsumeAsync(IBufferPullConsumer<Foo> consumer, CancellationToken cancellationToken){await foreach (var buffer in consumer.ConsumeAsync(cancellationToken)){foreach (var foo in buffer){// Process the foologger.LogInformation("Foo1PullConsumerHostService.ConsumeAsync: {Foo}", foo);}}}
}

push 模式的消费者示例:

通过 BufferPushCustomer 特性注册 push 模式的消费者。

push consumer 会被注册到 DI 容器中,可以通过构造函数注入其他服务,可以通过设置 ServiceLifetime 来控制 consumer 的生命周期。

BufferPushCustomerAttribute 中的 concurrency 参数用于设置 push consumer 的消费并发数,对应 pull consumer 的 consumerNumber。


[BufferPushCustomer(topicName: "topic-foo2",groupName: "group-foo2",batchSize: 100,serviceLifetime: ServiceLifetime.Singleton,concurrency: 2)]
public class Foo2PushConsumer(ILogger<Foo2PushConsumer> logger) : IBufferAutoCommitPushConsumer<Foo>
{public Task ConsumeAsync(IEnumerable<Foo> buffer, CancellationToken cancellationToken){foreach (var foo in buffer){logger.LogInformation("Foo2PushConsumer.ConsumeAsync: {Foo}", foo);}return Task.CompletedTask;}
}
[BufferPushCustomer("topic-bar","group-bar",100,ServiceLifetime.Scoped,2)]
public class BarPushConsumer(ILogger<BarPushConsumer> logger) : IBufferManualCommitPushConsumer<Bar>
{public async Task ConsumeAsync(IEnumerable<Bar> buffer, IBufferConsumerCommitter committer,CancellationToken cancellationToken){foreach (var bar in buffer){logger.LogInformation("BarPushConsumer.ConsumeAsync: {Bar}", bar);}var commitTask = committer.CommitAsync();if (!commitTask.IsCompletedSuccessfully){await commitTask.AsTask();}}
}

BufferQueue 内部设计概述

Topic 的隔离

BufferQueue 有以下的特性:

  • 同一个数据类型 下的 不同 Topic 的 BufferQueue 互不干扰。

  • 同一个 Topic 下的 不同数据类型 的 BufferQueue 互不干扰。

BufferQueue

这个特性是通过以下两层接口设计实现的:

  • IBufferQueue:根据 TopicName类型参数 T 将请求转发给具体的 IBufferQueue<T> 实现(借助 KeyedService 实现),其中参数 T 代表 Buffer 所承载的数据实体的类型。

  • IBufferQueue<T>:具体的 BufferQueue 实现,负责管理 Topic 下的数据。属于 Buffer 模块的内部实现,不对外暴露。

IBufferQueue

Partition 的设计

为了保证消费速度,BufferQueue 将数据划分为多个 Partition,每个 Partition 都是一个独立的队列,每个 Partition 都有一个对应的消费者线程。

Producer 以轮询的方式往每个 Partition 中写入数据。
Consumer 最多不允许超过 Partition 的数量,Partition 按平均分配到组内每个 Customer 上。
当一个 Consumer 被分配了多个 Partition 时,以轮训的方式进行消费。
每个 Partition 上会记录不同消费组的消费进度,不同组之间的消费进度互不干扰。

Partition

对并发的支持

Producer 支持并发写入。

Consumer 消费时是绑定 Partition 的,为保证能正确管理 Partition 的消费进度,Consumer 不支持并发消费。

如果要增加消费速度,需创建多个 Consumer。

Partition 的动态扩容

Partition 的基本组成单元是 Segment,Segment 代表保存数据的数组,多个 Segment 通过链表的形式组合成一个 Partition。

当一个 Segment 写满后,通过在其后面追加一个 Segment 实现扩容。

Segment 中用于保存数据的数组的每一个元素称为 Slot,每个 Slot 都有一个Partition 内唯一的自增 Offset。

Segment

Segment 的回收机制

每次在 Partition 中新增 Segment 时,会从头判断此前的 Segment 是否已经被所有消费组消费完,回收最后一个消费完的 Segment 作为新的 Segment 追加到 Partition 末尾使用。

SegmentRecycle

Benchmark

测试环境:Apple M2 Max 64GB

写入性能测试

与 BlockingCollection 对比并发,并发线程数为 CPU 逻辑核心数 12, partitionNumber 为 1 和 12。

测试结果
Benchmark

在并发写入时,BufferQueue 的写入性能明显优于 BlockingCollection。

消费性能测试

pull 模式 consumer 与 BlockingCollection 对比并发读取性能,并发线程数为 CPU 逻辑核心数 12,partitionNumber 为 12。

测试结果
Benchmark

在批量消费时,随着批量大小的增加,BufferQueue 的消费性能优势更加明显。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/773782.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

缓存优化(缓存击穿和缓存雪崩)

缓存优化(缓存击穿和缓存雪崩) 缓存击穿和缓存雪崩 缓存击穿缓存击穿是指用户查询的数据在缓存中不存在,但是后端数据库中却存在。 这种现象一般是由于缓存中的某个键过期导致的,比如一个热点数据键,它每时每刻都在接受大量的并发访问,如果某一刻这个键突然失效了,那么就…

【专题】2024家生活智能家居趋势报告合集PDF分享(附原数据表)

原文链接: https://tecdat.cn/?p=37146 近二十载间,中国消费市场见证了从产品创新到渠道创新的双重飞跃,无论是耐用消费品还是快速消费品,均在线上线下平台绽放出前所未有的丰富选择,多数行业已转型为以消费者为核心导向的买方市场格局。阅读原文,获取专题报告合集全文,…

c语言字符数组

字符数组与字符串,字符数据输出用%s表示 上面两种方式的区别:

mysql授权

mysql连接的两种方式 mysql服务端 10.0.0.51:3306 mysql -uroot -p密码 -h该账户允许登录的网段 -P实例端口第一种 基于ip:port的 网络链接形式,入口一 ,链接参数 ,-hlocahost -P3306 端口,窗口提供服务的入口windows机器,去链接 mysql服务端本质上是tcp的建立n…

DelphiJNI实际调试

1:下载 DelphiJNI:下载地址https://github.com/aleroot/DelphiJNI,版本比较老,没有找到其他,就用这个吧,如朋友有较新的pas文件,请留言下 2:下载jdk,这里下载JDK,这里使用javase-jkd18,也不知道这个版本要不要收费,这里学习用暂时不关新这个。 3:编写调用class的代…

playbook+roles安装nginx实战

基本目录结构host文件夹 用于存放主机清单文件 hosts文件 hosts文件内容如下:(仅供参考) [proxy] node2 [web] 192.168.xx.xxplaybook-all-roles.yml文件 用于指定执行哪个role的文件(命名可以自定义) 文件内容如下:(仅供参考) 因为roles文件夹下只有nginx一个文件夹,所…

2024夏中山集训第1周

【NOIP模拟一】20240729 C 注意到答案是s除以区间gcd。 裴蜀定理推广 D像这样建图,跑全源最短路。 在这张图上有 \(1\to 2\to 3\to 4\to 5\) 和 \(7\to 8\to 9\to 3\ to 10\ to 11\) 两条路径。把路径上的点看作车上的点,每个点本身看作车站。 可以发现在车(一条路径)上的点…

关于多模块开发各级目录的用途

参考苍穹外卖 项目整体结构如下各层的用途序号 名称 说明1 sky-take-out maven父工程,统一管理依赖版本,聚合其他子模块2 sky-common 子模块,存放公共类,例如:工具类、常量类、异常类等3 sky-pojo 子模块,存放实体类、VO、DTO等4 sky-server 子模块,后端服务,存放配置文…

WPF实现一个错误信息栏

实现结果一,首先建立一个UserControl 前台代码如下:点击查看代码 <UserControl x:Class="实现一个错误信息栏.ErrorLog"xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x="http://schemas.microsoft.com/winfx/2006/x…

c语言中数据的格式化输出

001、输出整型数据,直接输出[root@PC1 test]# ls test.c [root@PC1 test]# cat test.c #include <stdio.h>int main(void) {printf("[%d]\n", 123);return 0; } [root@PC1 test]# gcc test.c -o kkk [root@PC1 test]# ls kkk test.c [root@PC1 test]# ./kkk …

电脑技巧 | 你想拥有这样的自定义工具栏命令按钮吗?QTTabBar帮助你实现!

【电脑技巧】第90期:你想拥有这样的自定义工具栏命令按钮吗?QTTabBar帮助你实现!