线程判断:isInLoopThread()
的调用点(决定走同步还是异步路径)
在runInLoop()
中调用,如果调用此方法的线程就是当前运行此EventLoop的线程,则直接调用回调函数
如果调用此方法的线程不是此EventLoop的线程,调用QueueInLoop(cb)将对应回调函数放入多PendingFunctors_队列中
锁的应用:pendingFunctors
的mutex
保护范围
当调用queueInLoop的时候,因为不会是本线程调用,是其他线程调用queueInLoop方法,所以要使用mutex
同样,因为PendingFunctors是会由多个其他线程放入,本线程取出,所以是临界区,除了放入的queueInLoop会使用mutex再放入,本线程取出回调函数并执行函数的doPendingFunctors()方法中也需要使用mutex,从而实现EventLoop的线程安全,因为对EventLoop来说多线程访问的就是pendingFactors_
eventfd:wakeup()
如何通过写入1字节解除epoll_wait
阻塞
这主要是利用了统一事件源的思路,在EventLoop创建的时候会使用eventfd()
创建 一个文件描述符,然后将其包装成为一个Channel wekeupChannel, 并且注册EPPOLLIN事件,将其fd和感兴趣的事件加入到epoll中,所以当EventLoop不断地调用epoll_wait
的时候,另一个线程就可以通过获得此EventLoop对象,知道其wakeupfd,然后向其写入一字节的数据,就会使得的epoll_wait
有可读事件发生,就会返回,此线程被唤醒
为什么选择eventfd而不是pipe?
对比项 | eventfd |
pipe |
---|---|---|
作用 | 用来传递“事件通知”(通常是一个计数值) | 用来传递数据流 |
数据类型 | 固定 8 字节(uint64_t )的整数 |
任意字节数据(像 socket 一样) |
本质 | 内核提供的计数器 | 内核提供的缓存区(FIFO) |
线程安全 | 线程安全(系统调用级) | 线程安全 |
支持 epoll | ✅ 支持,非阻塞时配合 epoll 特别爽 | ✅ 支持 epoll 事件通知 |
适合场景 | 事件通知、信号唤醒、任务触发 | 数据传输、日志、命令流 |
性能 | 非常轻量(无数据拷贝) | 稍重(涉及数据拷贝) |
阻塞行为 | 阻塞/非阻塞都支持,可配置 | 一样可配置阻塞/非阻塞 |
带宽 | 很低(只传递 64-bit) | 可传任意数据 |
就是说:
- 如果你只想“唤醒”“通知”某个线程干事(比如
eventLoop
唤醒去干pendingFactor中的回调) → 用eventfd
- 如果你要传输实际的字节数据 → 用
pipe
任务风暴防御:pendingFunctors
的swap优化(避免长时间持锁)
void EventLoop::doPendingFunctors() // 执行回调
{std::vector<Functor> functors;callingPendingFunctors_ = true;{std::unique_lock<std::mutex> lock(mutex_);functors.swap(pendingFunctors_);}for (const Functor &functor : functors){functor(); // 执行当前loop需要执行的回调操作}callingPendingFunctors_ = false;
}
主要就是使用了swap()
, 就是将pendingFunctors_和新创建的空的functors进行内容交换,非常高效O(1), 几乎不复制元素,就交换内部指针,相比于使用赋值,他不复制元素,不会触发析构和构造函数
Swap()
的底层原理:交换内部指针 就是,就以vector
来说
template<typename T>
class vector {
private:T* _start; // 指向数据开始的位置T* _finish; // 指向最后一个元素的后面T* _end_of_storage; // 指向分配内存的末尾
};
本身存储就是这个3个指针,在创建的时候会在堆上分配空间,然后管理这3个指针,所以本身vector就是在管理这3个指针,swap交换就是将这个3个指针进行交换,所以是O(1),因此就很快,并且交换后functor就是指向原本的pendingFunctors_的内容,并且在运行完后会进行对应的销毁,而pendingFunctors_就又是新的部分
而且因为swap很快,对锁也不会长时间占用导致其他线程调用runInloop阻塞
然后第一个我遇见的问题是理解 基于 Reactor 模型的高性能服务器(比如 muduo)是怎么把 IO 和业务解耦开的关键点
主线程 (MainReactor)
│
├─ 负责监听 listenfd,接收新连接
│ 每接收一个连接,就把连接“分发”给某个 SubReactor
│
├──> EventLoop1 (IO线程,SubReactor) ┐
├──> EventLoop2 (IO线程,SubReactor) ┤ --- IO线程池
└──> EventLoopN (IO线程,SubReactor) ┘每个 EventLoop(也叫 SubReactor)负责多个连接的 IO 和事件分发,
而真正“干活”(比如:解析 HTTP 请求,处理 RPC)的逻辑会放到用户自定义的业务线程池里去跑。
这里面有业务线程池,但是muduo本身肯定并不知道,所以IO线程和业务线程如何配合?
他是这样的:
TcpConnection::handleRead(...)
他就是对应可读事件发生后会调用的回调,内部会调用messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
而这个回调就是用户定义的,可能是解析HTTP等等耗时的操作
所以正确的实现是:用户的回调 messageCallback_
只是将任务交给业务线程池执行,业务层肯定知道有什么线程池以及如何给其发布任务
// 用户设置 messageCallback,比如在服务器启动时设置:
conn->setMessageCallback([this](const TcpConnectionPtr& conn, Buffer* buf, Timestamp) {std::string msg = buf->retrieveAllAsString();// 投递一个任务给业务线程池,避免在IO线程中处理threadPool_.run([conn, msg] {std::string response = processMessage(msg); // 假设很耗时conn->send(response); // 回发给客户端});
});
而其中业务线程 conn->send(response)
又不是在 IO 线程里啊,怎么办?
就会发现在 TcpConnection::send
内部会检查是否是在此IO线程中,如果不是就会利用ventLoop::queueInLoop()
以上的流程并不完美,但是主要是关注说这个用户线程和IO线程的交互过程的一个示意
然后我遇见的另一个问题就是我发现在TCPConnection中
handleRead()
方法就是直接调用messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
已建立连接的用户,有可读事件发生了,调用用户传入的回调操作onMessage- 而
handleWrite()
是使用loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()) );
去调用回调了
我就在想不论如和这个handleWrite()
方法是在IO线程中epoll_wait()返回然后调用channel的回调的时候啊,那这个时候应该是IO线程在执行啊,为什么要queueInLoop?
然后我是粗略的理解了给的答案,是说是为了放在嵌套调用,崩栈或者状态错乱,就是因为这个写回调是用户写的,如果用户有用自己的业务线程池还好,但是如果它没有使用,那还是IO线程来做个回调,就可能会说写完了之后又在写,然后又会触发handleWrite()然后就一直陷入到了这个写的过程,而根本没有办法跳出来,更不用说在epoll_wait()监听其他时间了,而这个queueInloop就是把其放入到下一轮中,进行一种隔离的感觉
然后另一个问题就是说发现在queueInLoop中有出现if (!isInLoopThread() || callingPendingFunctors_)
void EventLoop::queueInLoop(Functor cb)
{{std::unique_lock<std::mutex> lock(mutex_);pendingFunctors_.emplace_back(cb);}// 唤醒相应的,需要执行上面回调操作的loop的线程了// || callingPendingFunctors_的意思是:当前loop正在执行回调,但是loop又有了新的回调if (!isInLoopThread() || callingPendingFunctors_) {wakeup(); // 唤醒loop所在线程}
}
就是我以为任何queuInLoop都是其他线程调用的,但却使用了callingPendingFunctors_就比较奇怪,在什么时候会出现在做pandingFactor里面的回调,这个回调也会queueInLoop????