CyberRT-共享内存实现

CyberRT共享内存类图

共享内存消息发布

在这里插入图片描述
数据用共享内存发布时,首先会创建ShmTransmitter对象,包含两个主要成员segment和notifier,Segment用于创建共享内存(上面绿色部分),Notifer 最终构建ReadableInfo通知给其他进程。
使用哪个ConditionNotifier-> notify或MulticastNotifier->notify,是在创建时根据配置文件决定的。
ConditionNotifier 在构建时会创建Indicator对象保存到共享内存中。
调ConditionNotifier-> notify,实际时将ReadableInfo保存到Indicator对象。

ConditionNotifier 共享内存数据接收

在这里插入图片描述
在接收数据时,也会创建同样的共享内存。如果共享内存存在,则直接打开。
在接收端也有同样的共享内存操作ConditionNotifier 。
ShmDispatcher会持有多个通道segment,用std::unordered_map<channelid, segment>表示。
同时启动一个后台线程ThreadFunc 线程轮询处理消息回调。

void ShmDispatcher::ThreadFunc() {ReadableInfo readable_info;// 轮询处理while (!is_shutdown_.load()) {// 100ms, Listen会转换100000 ms,对比seq,如果不等处理消息。每次轮询会等待递减50ms。if (!notifier_->Listen(100, &readable_info)) {ADEBUG << "listen failed.";continue;}if (readable_info.host_id() != host_id_) {ADEBUG << "shm readable info from other host.";continue;}//从共享内存Indicator中读出的数据uint64_t channel_id = readable_info.channel_id();uint32_t block_index = readable_info.block_index();{ReadLockGuard<AtomicRWLock> lock(segments_lock_);if (segments_.count(channel_id) == 0) {continue;}// check block index// std::unordered_map<uint64_t, uint32_t> previous_indexes_; // 保存key: channelID, value: block_indexif (previous_indexes_.count(channel_id) == 0) {previous_indexes_[channel_id] = UINT32_MAX;}uint32_t& previous_index = previous_indexes_[channel_id];if (block_index != 0 && previous_index != UINT32_MAX) {if (block_index == previous_index) {ADEBUG << "Receive SAME index " << block_index << " of channel "<< channel_id;} else if (block_index < previous_index) {ADEBUG << "Receive PREVIOUS message. last: " << previous_index<< ", now: " << block_index;} else if (block_index - previous_index > 1) {ADEBUG << "Receive JUMP message. last: " << previous_index<< ", now: " << block_index;}}previous_index = block_index;ReadMessage(channel_id, block_index);}}
}

MulticastNotifier共享内存数据接收

MulticastNotifier时采用多播socket实现的,默认

std::string mcast_ip("239.255.0.100");
uint16_t mcast_port = 8888;

创建两个socket notify_fd_ 用于发生消息,listen_addr用于接收消息。
在这里插入图片描述
在发送端调用Notify时,时调的MulticastNotifier::Nofify(const ReadableInfo& info)

bool MulticastNotifier::Notify(const ReadableInfo& info) {if (is_shutdown_.load()) {return false;}std::string info_str;info.SerializeTo(&info_str);ssize_t nbytes =sendto(notify_fd_, info_str.c_str(), info_str.size(), 0,(struct sockaddr*)&notify_addr_, sizeof(notify_addr_));return nbytes > 0;
}

接收端用同样的方式轮询

bool MulticastNotifier::Listen(int timeout_ms, ReadableInfo* info) {if (is_shutdown_.load()) {return false;}if (info == nullptr) {AERROR << "info nullptr.";return false;}struct pollfd fds;fds.fd = listen_fd_;fds.events = POLLIN;int ready_num = poll(&fds, 1, timeout_ms);if (ready_num > 0) {char buf[32] = {0};  // larger than ReadableInfo::kSizessize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);if (nbytes == -1) {AERROR << "fail to recvfrom, " << strerror(errno);return false;}return info->DeserializeFrom(buf, nbytes);} else if (ready_num == 0) {ADEBUG << "timeout, no readableinfo.";} else {if (errno == EINTR) {AINFO << "poll was interrupted.";} else {AERROR << "fail to poll, " << strerror(errno);}}return false;
}
bool Block::TryLockForWrite() {int32_t rw_lock_free = kRWLockFree;//lock_num_ == rw_lock_free, kWriteExclusive赋值给lock_num_,返回true//lock_num_ != rw_lock_free, lock_num_赋值给rw_lock_free,返回falseif (!lock_num_.compare_exchange_weak(rw_lock_free, kWriteExclusive,std::memory_order_acq_rel,std::memory_order_relaxed)) {ADEBUG << "lock num: " << lock_num_.load();return false;}return true;
}

总结
1、CyberRT的共享内存读写都时需要加锁的。
2、每次写数据可以是不连续的block
3、每次当Block.lock_num_= 0:空闲,>0:有读操作, -1 : 写操作。
效率不是高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/209797.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习之六(自编码器--Autoencoder)

概念 自编码器(Autoencoder)是一种神经网络架构,用于无监督学习和数据的降维表示。它由两部分组成:编码器(Encoder)和解码器(Decoder)。 结构: 编码器(Encoder): 接收输入数据并将其压缩为潜在表示(latent representation),通常比输入数据的维度要低。编码器的…

mysql中数据是如何被用B+树查询到的

innoDB是按照页为单位读写的 那页中有很多行数据&#xff0c;是怎么执行查询的呢&#xff0c;首先我们肯定&#xff0c;是以单向列表形式存储的&#xff0c;提高了增删的效率&#xff0c;但是查询效率低。所以实际上对页中的行数据进行了优化&#xff0c;能以二分的方式进行查…

2023 IoTDB Summit 应用实例议题详解 | 报名到场即送卫衣!

12 月 3 日&#xff0c;于北京丽都皇冠假日酒店二层举办的 2023 IoTDB 用户大会离我们越来越近啦~ 为回馈大家对我们的支持&#xff0c;欧欧为大家争取了 2023 IoTDB 用户大会三重福利&#xff1a;到场有礼、邀请有礼、转发有礼&#xff01; 到场有礼&#xff1a;凡是 12 月 3 …

如何在AppLink配置金蝶云星空预算使用单流程

上一篇有提到金蝶云星空如何通过AppLink平台配置销售订单操作&#xff0c;这次来演示下如何“保存预算使用单”、“调拨单定时自动审核”以及“预算使用单反审核后删除”操作。 根据请求数据保存预算使用单 当webhook接收到数据时触发流程 步骤1&#xff1a;根据webhook的请…

数据结构与算法编程题13

设计算法将一个带头结点的单链表A分解为两个具有相同结构的链表B、C&#xff0c;其中B表的结点为A表中值小于零的结点&#xff0c;而C表的结点为A表中值大于零的结点&#xff08;链表A中的元素为非零整数&#xff0c;要求B、C表利用A表的结点&#xff09; for example: A -1 2 …

redis-cluster集群

1.redis-cluster集群 redis3.0引入的分布式存储方案 集群由多个node节点组成&#xff0c;redis数据分布在这些节点之中。 在集群之中分为主节点和从节点 集群模式当中&#xff0c;主从一一对应&#xff0c;数据写入和读取与主从模式一样&#xff0c;主负责写&#xff0c;从…

ArgoWorkflow教程(一)---DevOps 另一选择?云原生 CICD: ArgoWorkflow 初体验

来自&#xff1a;探索云原生 https://www.lixueduan.com 原文&#xff1a;https://www.lixueduan.com/posts/devops/argo-workflow/01-deploy-argo-workflows/ 本文主要记录了如何在 k8s 上快速部署云原生的工作流引擎 ArgoWorkflow。 ArgoWorkflow 是什么 Argo Workflows 是…

BUUCTF [HBNIS2018]来题中等的吧 1

BUUCTF:https://buuoj.cn/challenges 题目描述&#xff1a; 得到的 flag 请包上 flag{} 提交。来源&#xff1a;https://github.com/hebtuerror404/CTF_competition_warehouse_2018 密文&#xff1a; 下载附件&#xff0c;解压得到一个.png图片。 解题思路&#xff1a; 我以…

Talk | UCSB博士生宋珍巧:基于人工智能的功能性蛋白质设计

本期为TechBeat人工智能社区第549期线上Talk。 北京时间11月22日(周三)20:00&#xff0c;UC Santa Barbara博士生—宋珍巧的Talk已准时在TechBeat人工智能社区开播&#xff01; 她与大家分享的主题是: “基于人工智能的功能性蛋白质设计”&#xff0c;介绍了如何利用机器学习算…

码云 -- 本地代码上传到码云

1. 在码云上创建远程仓库 复制远程仓库地址 2. 在本地代码上创建 git 仓库 在本地代码文件夹上&#xff0c;打开git 命令窗口 输入初始化命令&#xff0c;创建 git 仓库 git init3. 给 git 仓库添加远程仓库 继续输入 git 命令 git remote add origin 远程仓库地址4. 按 git 的…

SAP LU04记账更改通知单创建转储单报错:L3094 记帐修改没有份存在

解决办法&#xff1a; 使用事务码LU02&#xff0c;修改过账更改状态&#xff0c;将过账更改状态改为U&#xff0c;强制关闭 1. LU04 查找记账更改通知单号 2. 事务码LU02修改状态 这个时候再用LU04去查看的时候&#xff0c;就不会再显示了

Faster R-CNN源码解析(一)

目录 前言训练脚本(train_mobilenetv2.py)自定义数据集(my_dataset.py) 前言 Faster R-CNN 是经典的two-stage目标检测模型&#xff0c; 原理上并不是很复杂&#xff0c;也就是RPNFast R-CNN&#xff0c;但是在代码的实现上确实有很多细节&#xff0c;并且源码也非常的多&…