如何用rust实现一个异步channel

目录

  • 前言
  • 思路
    • 实现功能
    • 代码实现
  • 测试
    • 先引测试版包
    • 测试代码
    • 结果与分析
    • 思考
  • 尾语

前言

使用通信来共享内存,而不是通过共享内存来通信

上面这句话,是每个go开发者在 处理多线程通信时 的座右铭,go甚至把实现这个理念的channel直接焊在编译器里,几乎所有的go程序里都有channel的身影。
rust的异步和go的goroutine有异曲同工之妙,甚至可以把 tokio::spawn 理解为go关键字。但在rust中好像并没有异步channel的实现。本着求人不如求己的原则,决定diy一个类似go的channel。

思路

先看一下发送流程

唤醒发送任务
缓存已满,添加任务到
插入成功
重新发送
发送消息
尝试添加到缓存中
缓存空出
发送队列
唤起接收任务
End

再看一下接收流程

唤醒接受任务
缓存为空,添加任务至
取值成功
重新取值
接收消息
尝试从缓存中取消息
缓存入值
接收队列
唤起发送任务
End

总体来说流程清晰易懂,不管接收还是发送,都是先尝试从缓存队列中操作值,不成功则加入到对应队列,等待再次执行。反之则唤起相关任务,结束操作。

实现功能

  1. 首先需要实现一个存放值的环形缓冲区,并且每个单元应该是单独加锁的,从而避免全局锁。
  2. 需要两个任务队列,用来存放在饥饿模式(从缓存操作失败)下的 发送任务和接受任务。
  3. 按照rust习惯,将发送者和接受者拆开,并各自实现future
  4. 因为唤醒不是同步的,需要通过一个唤醒器来唤醒沉默的任务。
  5. 使用原子操作替代锁

代码实现

具体的就不写了,放在github上了
github地址:https://github.com/woshihaoren4/wd_tools/tree/main/src/channel

测试

这里主要和async-channel测试一下

  • async-channel 是最常见的异步channel,在crateio上有两千万的下载。

先引测试版包

cargo.toml

[dependencies]
tokio = {version = "1.22.0",features=["full"]}
wd_tools = {version = "0.8.3",features = ["sync","chan"]}
async-channel = "1.8.0"
  • wd_tools 是我们的channel,这里引用的sync chan两个feature,前者用于测试,后者是chan实现。

测试代码

测试场景:设置缓存长度为10,发100万数据,接100万数据。在1发送者1接受者,1发送者10接受者,10发送者1接受者,10发送者10接受者四种情况下的收发性能。

use std::fmt::Debug;
use wd_tools::channel as wd;
use async_channel as ac;#[tokio::main]
async fn main(){let ts = TestService::new(10);println!("test start ------------> wd_tools");ts.send_to_recv("1-v-1",true,100_0000,1,100_0000,1,|x|x).await;ts.send_to_recv("1-v-10",true,100_0000,1,10_0000,10,|x|x).await;ts.send_to_recv("10-v-1",true,10_0000,10,100_0000,1,|x|x).await;ts.send_to_recv("10-v-10",true,10_0000,10,10_0000,10,|x|x).await;println!("wd_tools <------------- test over");println!("test start ------------> async-channel");ts.send_to_recv("1-v-1",false,100_0000,1,100_0000,1,|x|x).await;ts.send_to_recv("1-v-10",false,100_0000,1,10_0000,10,|x|x).await;ts.send_to_recv("10-v-1",false,10_0000,10,100_0000,1,|x|x).await;ts.send_to_recv("10-v-10",false,10_0000,10,10_0000,10,|x|x).await;println!("async-channel <------------ test over");
}struct TestService<T>{wd_sender : wd::Sender<T>,wd_receiver : wd::Receiver<T>,ac_sender : ac::Sender<T>,ac_receiver : ac::Receiver<T>
}impl<T:Unpin+Send+Sync+Debug+'static> TestService<T>{pub fn new(cap:usize)->TestService<T>{let (wd_sender,wd_receiver) = wd::Channel::new(cap);let (ac_sender,ac_receiver) = ac::bounded(cap);TestService{wd_sender,wd_receiver,ac_sender,ac_receiver}}pub fn send<G:Fn(usize)->T+Send+Sync+'static>(&self,wg:wd_tools::sync::WaitGroup,is_wd:bool,max:usize,generater:G){let wd_sender = self.wd_sender.clone();let ac_sender = self.ac_sender.clone();wg.defer_args1(|is_wd|async move{for i in 0..max {let t = generater(i);if is_wd {wd_sender.send(t).await.expect(" 发送失败");}else{ac_sender.send(t).await.expect(" 发送失败");}}},is_wd);}pub fn recv(&self,wg:wd_tools::sync::WaitGroup,is_wd:bool,max:usize){let wd_receiver = self.wd_receiver.clone();let ac_receiver = self.ac_receiver.clone();wg.defer_args1(|is_wd|async move{for _i in 0..max {if is_wd {wd_receiver.recv().await.expect(" 接收失败");}else{ac_receiver.recv().await.expect(" 接收失败");}}},is_wd);}pub async fn send_to_recv<G:Fn(usize)->T+Send+Sync+Clone+'static>(&self,info:&'static str, is_wd:bool, sbase:usize, sgroup:usize, rbase:usize, rgroup:usize, generater:G){let now = std::time::Instant::now();let wg = wd_tools::sync::WaitGroup::default();let wg_send = wd_tools::sync::WaitGroup::default();let wg_recv = wd_tools::sync::WaitGroup::default();for _ in 0..sgroup{self.send(wg_send.clone(),is_wd,sbase,generater.clone());}for _ in 0..rgroup{self.recv(wg_recv.clone(),is_wd,rbase);}wg.defer(move ||async move{let now = std::time::Instant::now();wg_send.wait().await;println!("test[{}] ---> send use time:{}ms",info,now.elapsed().as_millis());});wg.defer(move ||async move{let now = std::time::Instant::now();wg_recv.wait().await;println!("test[{}] ---> recv use time:{}ms",info,now.elapsed().as_millis());});wg.wait().await;println!("test[{}] ---> all use time:{}ms",info,now.elapsed().as_millis());}
}

结果与分析

测试10次,取平均值做表,如下
在这里插入图片描述
如上图,得结论

  • 在1发收者和10发收者的情况下,两种channel效率相差不多。
  • 在发送者和接受者数量不等时,wd_tools::channel的性能明显优于async-channel

思考

分析结论之前先看一下async-channel的实现。虽然async-channel也是异步,但它并不依赖某个异步运行时来进行任务的上线文切换,而是使用concurrent-queueevent-listener进行消息调度,底层依赖于std::thread::park_timeout

相比event-listener的调度方式,直接管理tokio的Context则更适用于异步环境。尤其是存在大量等待的场景。如上面测试,接受者和发送者数量不等,需要长时间等待的情况。实际开发中,接受者或者发送者可能长时间处于饥饿的情况下,wd_tools::channel不会产生多余的资源开销,毕竟上下文被挂起了,也就不会被cpu执行。

当然实际是复杂的,因情而异,使用的CPU数量(线程数),缓存长度,异步任务数同样会影响消息队列的性能,尤其是不需要等待的场景下async-channel性能更优。

wd_tools::channel则更适合tokio异步环境。并且不会引起线程park,而产生其他影响。

尾语

wd_tools::channel 目前只是一个初级版本,还有很多地方待优化,比如过多的状态判断,对缓存区直接轮训加锁,而没有采用优化算法, 唤醒器完全可以通过一定优化策略替换带。
但这个思路是没错的,欢迎有想法的同志加入进来。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/3885.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

macOS编译AirMap开源全景图源码image-processing

1.克隆源码 git clone --recursive https://github.com/airmap/image-processing.git 2. 使用CLion打开CMakeLists.txt并做为工程打开 2.默认配置名为Default,可修改,下面的所有配置项都可改 3.点击OK后会自动生成

8 系统定时器(Systick)(STM32HAL库)

目录 系统定时器&#xff08;Systick&#xff09; SysTick定时器特性介绍 SysTick定时器的功能 SysTick定时器寄存器介绍 Systick定时器的使用 系统定时器&#xff08;Systick&#xff09; SysTick定时器特性介绍 计数宽度&#xff1a; 24bit来存储数据&#xff0c;2^24…

使用影刀RPA拆分excel数据

首先&#xff0c;要使程序有一定的兼容性&#xff0c;即增加互动性&#xff0c;认为选择要拆分的文件和拆分的依据列&#xff0c;可以利用影刀中的‘打开选择对话框’和‘打开输入对话框’来实现&#xff0c;这样一来便不用考虑待拆分excel的路径问题获取1中选择的依据拆分列&a…

企业构建知识库方案

AI模型理解误区&#xff1a;百万成本微调垂直行业达模型VS低成本建立企业专属知识库或ai助理_哔哩哔哩_bilibili

基于TensorFlow和Keras的狗猫数据集的分类实验

文章目录 前言一、环境配置1、anaconda安装2、修改jupyter notebook工作目录3、配置TensorFlow、Keras 二、数据集分类1、分类源码2、训练流程 三、模型调整1.图像增强2、网络模型添加dropout层 四、使用VGG19优化提高猫狗图像分类1、构建网络模型2、初始化一个VGG19网络实例3、…

课程20:API项目重构

🚀前言 本文是《.Net Core从零学习搭建权限管理系统》教程专栏的课程(点击链接,跳转到专栏主页,欢迎订阅,持续更新…) 专栏介绍:以实战为线索,基于.Net 7 + REST + Vue、前后端分离,不依赖任何第三方框架,从零一步一步讲解权限管理系统搭建。 专栏适用于人群:We…

详解如何使用nvm管理Node.js多版本

目录 NVM进行NodeJS多版本管理 背景 安装步骤 1. 下载nvm安装包 2. 安装nvm 使用步骤 下载nodejs 切换版本nodejs ​编辑 常用命令 NVM进行NodeJS多版本管理 背景 有的时候开发环境需要多个NodeJS的版本&#xff0c;这个时候就可以用NVM进行管理。 安装步骤 1. 下载n…

云原生之深入解析Flink on k8s的运行模式与实战操作

一、概述 Flink 核心是一个流式的数据流执行引擎&#xff0c;并且能够基于同一个 Flink 运行时&#xff0c;提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布&#xff0c;数据通信及容错机制等功能。Flink 官网不同版本的文档flink on k8s 官方文…

远古 Windows 98 SE 和 putty 0.63 连接 SSH

远古 Windows 98 SE 和 putty 0.63 连接 SSH 不忘初心一、故障表现二、产生原因三、解决办法四、重启 SSHD 服务生交配置参考 作者&#xff1a;高玉涵 时间&#xff1a;2023.7.1 操作系统&#xff1a; Windows 98 第二版 4.10.2222 A Linux version 5.19.0-32-generic (build…

DDOS攻击防御实战(威胁情报)

背景&#xff1a; 不知道大家最近有没有关注到&#xff0c;百度云CDN不支持免费了&#xff0c;网站安全问题越来越严重了…… 常见攻击 DDOS Distributed Denial of Service 分布式拒绝服务攻击可以使很多的计算机在同一时间遭受到攻击&#xff0c;使攻击的目标无法正常使用&…

【论文导读】- Variational Graph Recurrent Neural Networks(VGRNN)

文章目录 文章信息摘要BackgroundGraph convolutional recurrent networks (GCRN)Semi-implicit variational inference (SIVI) Variational graph recurrent neural network (VGRNN)VGRNN modelSemi-implicit VGRNN (SI-VGRNN) 文章信息 Variational Graph Recurrent Neural …

力扣 98. 验证二叉搜索树

题目来源&#xff1a;https://leetcode.cn/problems/validate-binary-search-tree/description/ C题解1&#xff1a;中序遍历&#xff0c;递归法。获取数组&#xff0c;如果是递增则返回true&#xff0c;否则返回false。 class Solution { public:void zhongxu(TreeNode* node…