C#中的Channel

news/2025/3/3 21:44:20/文章来源:https://www.cnblogs.com/axzxs2001/p/18746268

  在 .NET 的异步编程中,System.Threading.Channels 提供了一种强大的方式来处理生产者-消费者模式,尤其是当我们要在不同的任务或服务之间传递数据时。这篇文章我们就来聊聊 UnboundedChannelOptions 和 BoundedChannelOptions 这两个类,以及它们的使用场景和区别。


代码背景介绍

  我们先看一段代码,这是一个 ASP.NET Core 项目,它创建了一个 Channel<VectorData> 来存放消息,并通过 VectorService 来处理这些消息。

builder.Services.AddSingleton<Channel<VectorData>>(_ =>
{//return Channel.CreateUnbounded<VectorData>(new UnboundedChannelOptions//{//    SingleReader = true,//    AllowSynchronousContinuations = false,//});return Channel.CreateBounded<VectorData>(new BoundedChannelOptions(10){SingleReader = true,FullMode = BoundedChannelFullMode.Wait,AllowSynchronousContinuations = false,});
});

这里我们有两种选择:

  • UnboundedChannelOptions(无限容量通道)
  • BoundedChannelOptions(有限容量通道)

那么,这两者有什么区别呢?让我们深入解析一下。


UnboundedChannelOptions:无限容量通道

当你使用 Channel.CreateUnbounded<T>(new UnboundedChannelOptions {...}) 时,意味着创建了一个没有大小限制的通道。这意味着:

  1. 消息可以无限存入,不会因为容量问题被阻塞。
  2. 可能会导致内存占用过大,如果生产者速度远超消费者,数据会一直积累,可能会导致 OOM(内存溢出)。
  3. 适用于生产者和消费者速度匹配,或有足够资源支持的场景

关键属性解析:

  • SingleReader:是否只有一个消费者读取数据。设为 true,意味着不会有多个任务同时消费消息,提高读取效率。
  • AllowSynchronousContinuations:是否允许同步执行回调。如果设为 false,所有等待的任务都会被异步调度,避免线程被阻塞。

适用场景:

  • 生产者和消费者处理速度接近,不容易导致消息堆积。
  • 应用场景对内存不是特别敏感,允许暂时存放大量数据。
  • 任务执行时间较短,不容易积累大量待处理数据。

BoundedChannelOptions:有限容量通道

相对来说,BoundedChannelOptions 是容量受限的通道,它在创建时就指定了最大容量,如代码中的 new BoundedChannelOptions(10)

关键特性:

  1. 通道容量是有限的,超过 10 条消息时,生产者不能再写入(除非有消费者取走消息)。
  2. 可以控制通道满时的行为,通过 FullMode 指定:
    • BoundedChannelFullMode.Wait(默认):生产者等待,直到有空间可写入。
    • BoundedChannelFullMode.DropOldest:丢弃最早的数据,保证新数据能进入。
    • BoundedChannelFullMode.DropNewest:丢弃最新的数据,让旧数据保留。
    • BoundedChannelFullMode.DropWrite:直接丢弃当前要写入的数据。

关键属性解析:

  • SingleReader:同 UnboundedChannelOptions,控制是否只有一个消费者读取数据。
  • AllowSynchronousContinuations:是否允许同步执行任务,防止不必要的线程切换。

适用场景:

  • 生产者速度可能会远超消费者,需要限制消息堆积,避免内存占用过高。
  • 需要对通道满时的行为进行控制,例如优先保留新数据,或让生产者等待。

Unbounded vs Bounded:该如何选择?

特性UnboundedChannelOptionsBoundedChannelOptions
是否限制容量 ❌ 无限制 ✅ 受限(如 10 条消息)
适用于高吞吐 ✅ 适合 🚫 可能会影响
可能导致内存占满 ✅ 可能 🚫 不太可能
生产者快于消费者时 ❌ 可能导致 OOM ✅ 可以限制
配置通道满时行为 🚫 无法配置 ✅ 可配置 FullMode
适用场景 生产者、消费者速度匹配 生产者速度可能远超消费者

简单来说:

  • 如果你不确定生产者和消费者速度是否匹配,或者数据量不可控,建议用 BoundedChannelOptions
  • 如果你确定不会有大量数据积压,或者消费能力能跟上,UnboundedChannelOptions 也可以。

代码实践:修改为 UnboundedChannelOptions

如果我们改用 UnboundedChannelOptions,代码如下:

builder.Services.AddSingleton<Channel<VectorData>>(_ =>
{return Channel.CreateUnbounded<VectorData>(new UnboundedChannelOptions{SingleReader = true,AllowSynchronousContinuations = false,});
});

这里就没有了 BoundedChannelOptions 里的 FullMode,因为它本身不会满。

但是如果生产者不断写入,消费者来不及消费,可能会导致内存占用越来越大。


结论

  • UnboundedChannelOptions 适用于任务处理速度均衡的情况,但如果生产者远超消费者,会有 OOM 风险。
  • BoundedChannelOptions 可以防止内存爆炸,适用于高吞吐的生产者-消费者模式。
  • BoundedChannelOptions 可以通过 FullMode 控制通道满时的行为,而 UnboundedChannelOptions 没有这个选项。

如果你的数据量不确定,或者可能出现生产者>消费者的情况,建议优先选择 BoundedChannelOptions,以避免潜在的性能问题。


希望这篇文章能帮你理解这两种 Channel 选项的区别,让你在编写高并发、异步任务时能更好地选择合适的方案!

完整代码:

using System.Threading.Channels;var builder = WebApplication.CreateBuilder(args);builder.Services.AddHostedService<VectorService>();
builder.Services.AddSingleton<Channel<VectorData>>(_ =>
{//return Channel.CreateUnbounded<VectorData>(new UnboundedChannelOptions//{//    SingleReader = true,//    AllowSynchronousContinuations = false,//});return Channel.CreateBounded<VectorData>(new BoundedChannelOptions(10){SingleReader = true,FullMode = BoundedChannelFullMode.Wait,AllowSynchronousContinuations = false,});
});
var app = builder.Build();app.MapGet("/vector", async (Channel<VectorData> channel) =>
{await channel.Writer.WriteAsync(new VectorData($"这里是用户内容类信息,{DateTime.Now.ToString("yyyyMMddHHmmssfff")}"));return Results.Ok();
});app.Run();
public record VectorData(string content);
public class VectorService : BackgroundService
{private readonly Channel<VectorData> _channel;public VectorService(Channel<VectorData> channel){_channel = channel;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (await _channel.Reader.WaitToReadAsync(stoppingToken)){var vectorData = await _channel.Reader.ReadAsync(stoppingToken);Console.WriteLine($"开始向量化处理:{vectorData.content}");await Task.Delay(3000, stoppingToken);Console.WriteLine($"向量化处理结束");}}
}

  文章来源微信公众号

  想要更快更方便的了解相关知识,可以关注微信公众号 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/892242.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kimi/DeepSeek最新论文MoBA与NSA阅读

From:https://www.big-yellow-j.top/posts/2025/02/21/Kimi-DS-Paper.html DeepSeek最新论文:Native Sparse Attention: Hardware-Aligned and Natively Trainable Sparse Attention以及 Kimi最新论文MOBA: MIXTURE OF BLOCK ATTENTION FOR LONG-CONTEXT LLMS这几篇文章都是针…

Windows 剪贴板 编程原理引入

前言 不得不说上三休四的生活就是舒服,我都有精力提升自己了。 本文将基于自己在生活中遇到的现象进行探索,因此问题引入对自己较为重要,读者可以跳过。 文章主要探讨剪贴板格式问题,即下面的链接。https://learn.microsoft.com/zh-cn/windows/win32/dataxchg/standard-cli…

.NET9中基于策略角色验证的包冲突

今天在.NET项目中,使用基于策略角色的鉴权时,遇到一个401的问题,场景如下:Program.cs代码如下:using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Options; using Microsoft.IdentityModel.Tokens; usin…

yolov5预处理

1.yolov5预处理流程1.等比缩放与填充:将输入图像等比缩放到目标尺寸(如640640),并在多余部分填充灰条,保持图像的宽高比不变。这一过程也被称为“letterbox”。  2.颜色空间转换:将图像从BGR格式转换为RGB格式(OpenCV默认读取为BGR)。  3.归一化:将像素值从[0, 25…

.NET9中使用Options

选项模式在 ASP.NET Core 中使用类来提供对相关配置设置的强类型访问。通过将配置设置隔离到单独的类,应用程序遵循封装和关注点分离的原则。封装确保依赖于配置的类仅依赖于其使用的设置;关注点分离则确保应用的不同部分的设置互不依赖或耦合。此外,选项模式还提供了验证配…

Semantic Kernel:OpenAPI的Plugin

SK的插件,能让AI功能如虎添翼。Plugin能让AI与本地功能和数据互动,使AI与应用的结合,并起到了很好的粘合剂作用。怎么能让Plugin本身和功能分离呢?调用API是一个很好的办法,比如下面是一个查询订单的功能,是一个mini API项目。using System.Text.Json.Serialization;var …

Semantic Kernel:Phi-4试用

微软在去年12月12日首次展示了Phi-4模型,该模型拥有140亿参数,但表现极为强大。在多项测试中表现优异:GPQA(研究生水平问答)和MATH数学基准测试中,Phi-4的表现超过了OpenAI的GPT-4o,并超越同类顶级开源模型Qwen 2.5 - 14B和Llama 3.3 - 70B。 在美国数学竞赛AMC测试中,…

2025/3/2 【栈与队列】LeetCode232. 用栈实现队列

232. 用栈实现队列 - 力扣(LeetCode) 代码随想录 (programmercarl.com) 思想:用下面的两个栈模拟队列 from collections import deque class MyQueue:def __init__(self):# in主要负责push,out主要负责popself.stackin = []self.stackout = []def push(self, x: int) -> …

Semantic Kernel:Process

Process(流程)是为实现特定业务目标而设计的步骤集合,通过提供服务或产品,为客户创造价值。每个流程由一系列有序的活动组成,这些活动共同完成一个整体目标,旨在提升效率、优化决策,并促进跨团队协作。 在Microsoft Semantic Kernel框架中,Process Framework是一种强大…

Semantic Kernel:新Agent代理

在之前的SemanticKernel中,有一篇关于Agent的文章,不过现在看来其中使用的包过时,所以这篇来更新一下。原文章如下:Semantic Kernel:Agent代理 桂素伟,公众号:桂迹Semantic Kernel:Agent代理原来项目引有的Nuget包如下,版本停留在了1.18.2,2024年9月4日   最新的Agen…

基于uniCloud开发的管理端部署

参考链接: https://doc.dcloud.net.cn/uniCloud/publish.html https://doc.dcloud.net.cn/uniCloud/hosting.html#使用 上传云函数前端网页托管勾选将编译后的资源部署到前端网页托管选项 配置自定义域名配置域名,根据提示在域名管理处添加解析将得到的cname值在域名解析出添…

UnitsNet 库简介

UnitsNet 是一个功能强大的 .NET 库,专为简化物理单位的处理而设计。它提供了丰富的单位类型及其转换功能,使开发人员能够在代码中方便地进行物理单位间的转换、计算和显示。UnitsNet 支持多种领域的物理单位,例如长度、质量、体积、温度、速度、面积等,极大地提高了开发效…