01. muduo跨线程调度

news/2025/4/1 3:21:26/文章来源:https://www.cnblogs.com/lang77/p/18799698
sequenceDiagramparticipant MainLoop as 主线程EventLoopparticipant SubLoop as 子线程EventLoopparticipant Eventfd as eventfd唤醒机制MainLoop->>SubLoop: 调用runInLoop(cb)alt isInLoopThread()=true 同线程MainLoop->>MainLoop: 直接执行cb()else isInLoopThread()=false 跨线程MainLoop->>SubLoop: queueInLoop(cb)MainLoop->>Eventfd: 写入1字节(wakeup)Eventfd->>SubLoop: 触发wakeupChannel可读事件SubLoop->>SubLoop: 执行wakeupChannel channelhandler读取一字节SubLoop->>SubLoop: doPendingFunctors() 处理pendingFunctors队列end

线程判断isInLoopThread()的调用点(决定走同步还是异步路径)

runInLoop()中调用,如果调用此方法的线程就是当前运行此EventLoop的线程,则直接调用回调函数
如果调用此方法的线程不是此EventLoop的线程,调用QueueInLoop(cb)将对应回调函数放入多PendingFunctors_队列中

锁的应用pendingFunctorsmutex保护范围

当调用queueInLoop的时候,因为不会是本线程调用,是其他线程调用queueInLoop方法,所以要使用mutex
同样,因为PendingFunctors是会由多个其他线程放入,本线程取出,所以是临界区,除了放入的queueInLoop会使用mutex再放入,本线程取出回调函数并执行函数的doPendingFunctors()方法中也需要使用mutex,从而实现EventLoop的线程安全,因为对EventLoop来说多线程访问的就是pendingFactors_

eventfdwakeup()如何通过写入1字节解除epoll_wait阻塞

这主要是利用了统一事件源的思路,在EventLoop创建的时候会使用eventfd()创建 一个文件描述符,然后将其包装成为一个Channel wekeupChannel, 并且注册EPPOLLIN事件,将其fd和感兴趣的事件加入到epoll中,所以当EventLoop不断地调用epoll_wait的时候,另一个线程就可以通过获得此EventLoop对象,知道其wakeupfd,然后向其写入一字节的数据,就会使得的epoll_wait有可读事件发生,就会返回,此线程被唤醒

为什么选择eventfd而不是pipe?

对比项 eventfd pipe
作用 用来传递“事件通知”(通常是一个计数值) 用来传递数据流
数据类型 固定 8 字节(uint64_t)的整数 任意字节数据(像 socket 一样)
本质 内核提供的计数器 内核提供的缓存区(FIFO)
线程安全 线程安全(系统调用级) 线程安全
支持 epoll ✅ 支持,非阻塞时配合 epoll 特别爽 ✅ 支持 epoll 事件通知
适合场景 事件通知、信号唤醒、任务触发 数据传输、日志、命令流
性能 非常轻量(无数据拷贝) 稍重(涉及数据拷贝)
阻塞行为 阻塞/非阻塞都支持,可配置 一样可配置阻塞/非阻塞
带宽 很低(只传递 64-bit) 可传任意数据

就是说:

  • 如果你只想“唤醒”“通知”某个线程干事(比如 eventLoop 唤醒去干pendingFactor中的回调) → 用 eventfd
  • 如果你要传输实际的字节数据 → 用 pipe

任务风暴防御pendingFunctors的swap优化(避免长时间持锁)

void EventLoop::doPendingFunctors() // 执行回调
{std::vector<Functor> functors;callingPendingFunctors_ = true;{std::unique_lock<std::mutex> lock(mutex_);functors.swap(pendingFunctors_);}for (const Functor &functor : functors){functor(); // 执行当前loop需要执行的回调操作}callingPendingFunctors_ = false;
}

主要就是使用了swap(), 就是将pendingFunctors_和新创建的空的functors进行内容交换,非常高效O(1), 几乎不复制元素,就交换内部指针,相比于使用赋值,他不复制元素,不会触发析构和构造函数

Swap()的底层原理:交换内部指针 就是,就以vector来说

template<typename T>
class vector {
private:T*      _start;      // 指向数据开始的位置T*      _finish;     // 指向最后一个元素的后面T*      _end_of_storage; // 指向分配内存的末尾
};

本身存储就是这个3个指针,在创建的时候会在堆上分配空间,然后管理这3个指针,所以本身vector就是在管理这3个指针,swap交换就是将这个3个指针进行交换,所以是O(1),因此就很快,并且交换后functor就是指向原本的pendingFunctors_的内容,并且在运行完后会进行对应的销毁,而pendingFunctors_就又是新的部分

而且因为swap很快,对锁也不会长时间占用导致其他线程调用runInloop阻塞


然后第一个我遇见的问题是理解 基于 Reactor 模型的高性能服务器(比如 muduo)是怎么把 IO 和业务解耦开的关键点

主线程 (MainReactor)
│
├─ 负责监听 listenfd,接收新连接
│   每接收一个连接,就把连接“分发”给某个 SubReactor
│
├──> EventLoop1 (IO线程,SubReactor) ┐
├──> EventLoop2 (IO线程,SubReactor) ┤   --- IO线程池
└──> EventLoopN (IO线程,SubReactor) ┘每个 EventLoop(也叫 SubReactor)负责多个连接的 IO 和事件分发,
而真正“干活”(比如:解析 HTTP 请求,处理 RPC)的逻辑会放到用户自定义的业务线程池里去跑。

这里面有业务线程池,但是muduo本身肯定并不知道,所以IO线程和业务线程如何配合?

他是这样的:

TcpConnection::handleRead(...) 他就是对应可读事件发生后会调用的回调,内部会调用messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);而这个回调就是用户定义的,可能是解析HTTP等等耗时的操作

所以正确的实现是:用户的回调 messageCallback_ 只是将任务交给业务线程池执行,业务层肯定知道有什么线程池以及如何给其发布任务

// 用户设置 messageCallback,比如在服务器启动时设置:
conn->setMessageCallback([this](const TcpConnectionPtr& conn, Buffer* buf, Timestamp) {std::string msg = buf->retrieveAllAsString();// 投递一个任务给业务线程池,避免在IO线程中处理threadPool_.run([conn, msg] {std::string response = processMessage(msg); // 假设很耗时conn->send(response); // 回发给客户端});
});

而其中业务线程 conn->send(response) 又不是在 IO 线程里啊,怎么办?

就会发现在 TcpConnection::send内部会检查是否是在此IO线程中,如果不是就会利用ventLoop::queueInLoop()

sequenceDiagramparticipant Clientparticipant MainLoop as MainReactorparticipant IO as SubReactor(IO线程)participant App as 用户代码(messageCallback)participant Worker as 业务线程participant IO2 as SubReactor(IO线程 again)Client->>MainLoop: 建立连接MainLoop->>IO: 分发 TcpConnection 给 SubReactorClient->>IO: 发送数据(触发 EPOLLIN)IO->>IO: epoll_wait 返回可读事件IO->>IO: handleRead()IO->>App: 调用 messageCallback(conn, buffer)App->>Worker: 提交任务到线程池执行(解析请求)Worker-->>App: 处理完业务,调用 conn->send(),内部会调用 sendInLoop()App->>IO2: conn->send() -> runInLoop(sendInLoop)IO2->>IO2: queueInLoop() 添加任务IO2->>IO2: 唤醒 eventfd -> epoll 返回IO2->>IO2: doPendingFunctors() -> 执行 sendInLoopIO2->>IO2: handleWrite() -> 实际写入 socketIO2->>App: queueInLoop 放入 writeCompleteCallback(仍在 IO线程)到 pendingFactors

以上的流程并不完美,但是主要是关注说这个用户线程和IO线程的交互过程的一个示意


然后我遇见的另一个问题就是我发现在TCPConnection中

  • handleRead()方法就是直接调用messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); 已建立连接的用户,有可读事件发生了,调用用户传入的回调操作onMessage
  • handleWrite()是使用loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()) );去调用回调了

我就在想不论如和这个handleWrite()方法是在IO线程中epoll_wait()返回然后调用channel的回调的时候啊,那这个时候应该是IO线程在执行啊,为什么要queueInLoop?

然后我是粗略的理解了给的答案,是说是为了放在嵌套调用,崩栈或者状态错乱,就是因为这个写回调是用户写的,如果用户有用自己的业务线程池还好,但是如果它没有使用,那还是IO线程来做个回调,就可能会说写完了之后又在写,然后又会触发handleWrite()然后就一直陷入到了这个写的过程,而根本没有办法跳出来,更不用说在epoll_wait()监听其他时间了,而这个queueInloop就是把其放入到下一轮中,进行一种隔离的感觉


然后另一个问题就是说发现在queueInLoop中有出现if (!isInLoopThread() || callingPendingFunctors_)

void EventLoop::queueInLoop(Functor cb)
{{std::unique_lock<std::mutex> lock(mutex_);pendingFunctors_.emplace_back(cb);}// 唤醒相应的,需要执行上面回调操作的loop的线程了// || callingPendingFunctors_的意思是:当前loop正在执行回调,但是loop又有了新的回调if (!isInLoopThread() || callingPendingFunctors_) {wakeup(); // 唤醒loop所在线程}
}

就是我以为任何queuInLoop都是其他线程调用的,但却使用了callingPendingFunctors_就比较奇怪,在什么时候会出现在做pandingFactor里面的回调,这个回调也会queueInLoop????

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/907736.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UML用例图-UML Use Case Diagram

用例图是UML中最简单,使用最高频的图之一,它通常用于诠释“这个软件做了什么”。用例图的的表达非常简单并且通俗易懂,不论研发、产品、测试,还是完全不懂软件的外行,用例图基本都能被看懂。.wj_nav { display: inline-block; width: 100%; margin-top: 0; margin-bottom:…

喜提好人卡?混元模型模块独立维护与开源发布

接着我们上次向Spring AI提交的混元模型模块,我已经完成了所有关于混元的聊天对接,并提交了相应的PR描述。提交后,荣获了一张“好人卡”,如图所示:今天,我们决定将之前提交给Spring AI官方的PR重新拿出来,并准备将其独立维护在一个开源仓库中。做出这一决策的原因是Spri…

VMware Aria Automation 8.18.1 新增功能简介

VMware Aria Automation 8.18.1 新增功能简介VMware Aria Automation 8.18.1 - 多云基础架构自动化平台 Multi-Cloud Infrastructure Automation Platform 请访问原文链接:https://sysin.org/blog/vmware-aria-automation/ 查看最新版。原创作品,转载请保留出处。 作者主页:…

最小二乘及最小二乘法系统辨识

目录一般最小二乘法加权最小二乘递推最小二乘 一般最小二乘法 无需每次对整体数据进行最小二乘估计,利用上次计算的值与当前数据融合计算,获取计算结果。本质上是对老的计算结果修正的方式,节省计算步骤,降低运算量,提升计算的效率。 若辨识模型: \[z_{(k)}={h_{(k)}}^T\…

氩气保护焊接气体流量控制

该节气装置通过以下方法,为您减少30%-50%的焊接气体的用量。在整个焊接过程中,气流与焊接电流自动同步调节。 “较大电流 — 较多气体/较小电流 — 较少气体”极高速气阀即使在焊接停止时间很短的情况下,也可开闭。通过持续的气体保护以及预先“编程”,脉冲气体可进一步减小…

2025.3.29日 清华大学-郝泽旭 的模拟赛

2025.3.29日 清华大学-郝泽旭 的模拟赛原题可以转化为给定了 \(k\) 个起点,求 \(k\) 个终点,使得路径两两无公共点的所有路径之和的最大值。 注意到先上再下的路径可以拆成两部分,即从关键点开始先向上到达一个点后再向下可以看作一条从关键点向上的路径与一条从非关键点向上…

DDL转换,一键导出表结构,生成DDL语句【转载】

源代码:https://gitee.com/wsitm/RuoYi-RDBMS 一、驱动管理驱动管理功能,用户能够上传并配置各类数据库驱动包。无论是常见的MySQL、Oracle、PostgreSQL、SQL Server,还是更多小众的数据库类型,只需上传相应的驱动包,即可迅速完成配置。 这一功能不仅简化了驱动安装与更新…

库卡机器人维修KSS26045硬件错误维修

库卡机器人在使用过程中,可能会遇到常见的KSS26045硬件故障,这些机器人故障大致可以归结为以下几种类型: 先是电源故障。一旦电源系统出现问题,库卡机器人可能会面临无法启动或无法维持正常运行的困境。为了诊断电源故障,可以利用万用表等专业工具来精确测量电源的输出电压…

PHP历理 精准处理数学表达式中的小数末尾零

<?php /*** 精准处理数学表达式中的小数末尾零* @param string $expression 原始数学表达式* @return string 处理后的规范表达式*/ function formatMathExpression(string $expression): string {// 正则匹配所有小数(包含整数部分和小数部分)return preg_replace_callb…

判断 Python 代码是不是 AI写的几个简单方法

作者:Laurel W来源:Adobe作为一名数据科学和数学老师,我其实不介意我的学生使用像 ChatGPT 这样的 LLM,只要它是用来辅助他们学习,而不是取代学习过程。加州理工学院的申请文书指南启发了我为编程和机器学习课制定 AI 使用政策: 哪些是加州理工申请文书中不道德的 AI 使用…

Golang学习Ⅱ

iota,函数多返回值,init函数,import导包 常量定义方式:const a int = 10; const{ a=10 b=20 }1 const{ 2 BeiJing = iota*10 //iota为0 3 ShangHai 4 NanJing 5 } //使用const定义枚举,BeiJing为0,ShangHai为10,NanJing为20View Code

SpringBoot整合RabbitMQ--Direct和Topic模式

一.Direct模式 这几个模式使用SpringBoot的整合和前面使用源生Java整合其实是差不多的,故而步骤就不再详细赘述了,直接先导入依赖:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId&g…