muduo网络库核心代码阅读(Thread,EventLoopThread,EventLoopThreadPool)(4)

news/2025/2/15 16:02:15/文章来源:https://www.cnblogs.com/xiaodao0036/p/18717082

muduo网络库核心设计为one loop one thread,即一个线程一个事件循环。其中,主Reactor负责监听新连接,并将新连接平均分配到从Reactor中,而从Reactor则对分配到其上的用户进行io交互,接收并处理用户发来的数据,包括消息的回复(实际上,在使用中需要手动设置Reactor线程的数量,如果不设置,muduo默认为单Reactor,即所有的操作都在一个Reactor中)。

多Reactor设计

多线程Reactor的管理:EventLoopThread和EventLoopThreadPool。
EventLoopThread封装了Thread和EventLoop,对外提供接口在成员函数thread中进行loop循环并由EventLoopThreadPool预先创建统一管理。

Thread

Thread类是muduo自己封装pthread库实现的线程类,可手动控制线程开始的时间和自动管理线生命周期。

class Thread : noncopyable
{
public:typedef std::function<void ()> ThreadFunc;explicit Thread(ThreadFunc, const string& name = string());// FIXME: make it movable in C++11~Thread();void start();int join(); // return pthread_join()bool started() const { return started_; }// pthread_t pthreadId() const { return pthreadId_; }pid_t tid() const { return tid_; }const string& name() const { return name_; }static int numCreated() { return numCreated_.get(); }private:void setDefaultName();bool       started_;bool       joined_;pthread_t  pthreadId_;      pid_t      tid_;ThreadFunc func_;       //要在新线程执行的函数string     name_;//CountDownLatch是一种同步工具,通过条件变量和控制其成员变量latch的大小,在多线程编程中实现线程间的同步CountDownLatch latch_;     static AtomicInt32 numCreated_;
};//构造函数
Thread::Thread(ThreadFunc func, const string& n):   started_(false),joined_(false),pthreadId_(0),tid_(0),func_(std::move(func)),name_(n),latch_(1)
{setDefaultName();
}//析构函数
Thread::~Thread()
{//如果线程开始执行,并且没有同步,则将线程分离if (started_ && !joined_){pthread_detach(pthreadId_);}
}

主要函数

start函数可以开启线程并执行传入的函数,在返回之前确保线程函数已经开始执行

void Thread::start()
{assert(!started_);started_ = true;    //将开始标记置为真// FIXME: move(func_)detail::ThreadData* data = new detail::ThreadData(std::move(func_), name_, &tid_, &latch_);     //ThreadData封装了线程变量,并提供了同步服务if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)){started_ = false;delete data; // or no delete?LOG_SYSFATAL << "Failed in pthread_create";}else{latch_.wait();      //等待线程函数开始执行assert(tid_ > 0);}
}

线程的同步控制

同步工具CountDownLatch

class CountDownLatch : noncopyable
{
public:explicit CountDownLatch(int count);void wait();void countDown();private:mutable MutexLock mutex_;   //守卫锁,同std::lock_guardCondition condition_ GUARDED_BY(mutex_);    //条件变量,同std::condition_variableint count_ GUARDED_BY(mutex_);      //计数器
};//wait
//只有当计数器小于等于零时,wait才会返回
void CountDownLatch::wait()
{MutexLockGuard lock(mutex_);while (count_ > 0){condition_.wait();}
}//countDown
//计数器减一,如果计数器减一后等于零,则唤醒所有等待的线程
void CountDownLatch::countDown()
{MutexLockGuard lock(mutex_);--count_;if (count_ == 0){condition_.notifyAll();}
}

线程数据结构ThreadData

struct ThreadData
{typedef muduo::Thread::ThreadFunc ThreadFunc;ThreadFunc func_;string name_;pid_t* tid_;CountDownLatch* latch_;ThreadData(ThreadFunc func,const string& name,pid_t* tid,CountDownLatch* latch): func_(std::move(func)),name_(name),tid_(tid),latch_(latch){ }void runInThread(){*tid_ = muduo::CurrentThread::tid();tid_ = NULL;latch_->countDown();    //计数器减一,如果计数器为零,唤醒等待线程,确保线程已经开,并且开始执行latch_ = NULL;//执行函数...}
};void* startThread(void* obj)
{ThreadData* data = static_cast<ThreadData*>(obj);data->runInThread();delete data;return NULL;
}

EventLoopThread

EventLoopThread的设计与Thread类似,都提供了手动开启接口和同步服务

class EventLoopThread : noncopyable
{
public:typedef std::function<void(EventLoop*)> ThreadInitCallback;EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),const string& name = string());~EventLoopThread();EventLoop* startLoop();     //开启loop循环private:void threadFunc();EventLoop* loop_ GUARDED_BY(mutex_);    //该线程对应的唯一loopbool exiting_;Thread thread_;MutexLock mutex_;Condition cond_ GUARDED_BY(mutex_);ThreadInitCallback callback_;
};//实现文件
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,const string& name):   loop_(NULL),exiting_(false),thread_(std::bind(&EventLoopThread::threadFunc, this), name),mutex_(),cond_(mutex_),callback_(cb)
{
}EventLoopThread::~EventLoopThread()
{exiting_ = true;if (loop_ != NULL) // not 100% race-free, eg. threadFunc could be running callback_.{// still a tiny chance to call destructed object, if threadFunc exits just now.// but when EventLoopThread destructs, usually programming is exiting anyway.loop_->quit();thread_.join();}
}//开启loop循环并返回对应的EventLoop
//通过条件变量,确保loop已经被创建
EventLoop* EventLoopThread::startLoop()
{assert(!thread_.started());thread_.start();        //开启线程,执行threadFunc函数EventLoop* loop = NULL;{MutexLockGuard lock(mutex_);while (loop_ == NULL){cond_.wait();}loop = loop_;}return loop;
}//将loop创建在栈空间上,loop循环确保loop在使用中不会回收,循环结束后自动回收
void EventLoopThread::threadFunc()
{EventLoop loop;if (callback_)      {callback_(&loop);}{MutexLockGuard lock(mutex_);loop_ = &loop;cond_.notify();}loop.loop();//assert(exiting_);MutexLockGuard lock(mutex_);loop_ = NULL;
}

EventLoopThreadPool

class EventLoopThreadPool : noncopyable
{
public:typedef std::function<void(EventLoop*)> ThreadInitCallback;EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);~EventLoopThreadPool();//设置线程数void setThreadNum(int numThreads) { numThreads_ = numThreads; }void start(const ThreadInitCallback& cb = ThreadInitCallback());// valid after calling start()/// round-robinEventLoop* getNextLoop();private:EventLoop* baseLoop_;string name_;bool started_;int numThreads_;int next_;std::vector<std::unique_ptr<EventLoopThread>> threads_;std::vector<EventLoop*> loops_;
};//实现
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg):   baseLoop_(baseLoop),name_(nameArg),started_(false),numThreads_(0), //线程数默认为零next_(0)
{
}EventLoopThreadPool::~EventLoopThreadPool()
{
// Don't delete loop, it's stack variable
}//开启所有线程即其loop循环
//根据设置的线程数来创建子loop线程(从Reactor,主Reactor需在使用之前手动创建)
//如果没有设置线程数,则只存在一个loop循环,即主Reactor
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{assert(!started_);baseLoop_->assertInLoopThread();started_ = true;for (int i = 0; i < numThreads_; ++i){char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);EventLoopThread* t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));loops_.push_back(t->startLoop());}if (numThreads_ == 0 && cb){cb(baseLoop_);}
}//获取子loop,如果未创建子loop,则返回主loop
EventLoop* EventLoopThreadPool::getNextLoop()
{baseLoop_->assertInLoopThread();assert(started_);EventLoop* loop = baseLoop_;if (!loops_.empty()){// round-robinloop = loops_[next_];++next_;if (implicit_cast<size_t>(next_) >= loops_.size()){next_ = 0;}}return loop;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/884289.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#生成多尺寸bmp格式ICO图标代码

代码取自deepseek,且已经过本地执行测试 //.cs 文件类型,便于外部编辑时使用 // 引用必要的命名空间 using System.Collections.Generic; using System.Drawing; using System.Drawing.Imaging; using System.IO; using System.Linq; using System.Runtime.InteropServices;/…

CTA:回测执行引擎BacktestingEngine

回测执行引擎BacktestingEngine 回顾前面的文章CTA:回测综述,那里提到,真正执行回测的逻辑,写在BacktestingEngine中。 代码解读 BacktestingEngine定义在vnpy_ctastrategy -> backtesting.py中。 package from collections import defaultdict from datetime import da…

CTA:回测快速示例

设置工作目录 VNPY程序启动后,会产生一个工作目录,程序运行产生的数据、系统配置都会放在指定的.vntrader目录当中。 这一设置在vnpy -> utility.py -> _get_trader_dir函数中可以找到,工作目录由TRADER_DIR, TEMP_DIR确定。 def _get_trader_dir(temp_name: str) -&g…

VNPY体系结构

整体架构每个层次的模块只调用下层功能,并对上层提供接口。接口层负责对接行情和交易API,将行情数据推送到系统,发送交易指令。 引擎层向下对接交易接口、数据库,向上服务于各种上层应用。 应用层主要是面向用户的可视化界面,这一部分的代码主要为引擎功能与界面的连接。功…

【ABP】项目示例(2)——聚合根和实体

聚合根和实体 在上一章节中,已经完成了项目搭建的前置准备,在这一章节中,实现领域层的聚合根和实体 创建名称为General.Backend.Domain的标准类库,分别新建名称为Entities、Services、IRepositories和Specifications的文件夹,用于存放实体和聚合根、领域服务、仓储接口和规…

P1896 [SCOI2005] 互不侵犯(状态压缩)

位运算符好麻烦,没打括号被卡了半天 #include<iostream> #define int long long using namespace std; int f[12][100][1<<11]; int s[1<<11]; int num[1<<11]; signed main(){int n,k;cin>>n>>k;int cnt=0;for(int i=0;i<(1<<…

200N03-ASEMI豆浆机专用MOS管200N03

200N03-ASEMI豆浆机专用MOS管200N03编辑:ll 200N03-ASEMI豆浆机专用MOS管200N03 型号:200N03 品牌:ASEMI 封装:TO-252 批号:最新 最大漏源电流:200A 漏源击穿电压:30V RDS(ON)Max:1.8mΩ 引脚数量:3 芯片个数: 沟道类型:N沟道MOS管、中低压MOS管 漏电流:ua 特性:…

第七章-收益归因:Brinson模型

例子 现有一个投资组合,其基准组合为:70%中证800,20%债券,10%现金,如下:基金经理主动投资,对资产权重进行调整。假设基准组合收益率为\(r\)。若是看好股票,那就多配置一些股票,调整中证800权重为\(w_1^{\prime}\)。则收益调整为(\(w_1^{\prime}-w)(r_1-r)\)。会有两种情…

我用GPT干什么

有几天没有更新文章了,一方面因为感觉GPT出来了,写什么都不香了,非得写点关于GPT的才有资格出来和大家见面;另一方面,确实最近一段时间也在全面拥抱GPT:学习,总结,思考。所以今天就想随心所欲地说说GPT使用感受。其实GPT出来没多长时间,就注册账号,并体验了网页版,更…

金融期权

金融期权品种一览上海证券交易所品种 行权方式 上市日期华夏上证50ETF期权 欧式 2015-02-09华泰沪深300ETF期权 欧式 2019-12-23南方中证500ETF期权 欧式 2022-09-19华夏上证科创板50ETF期权 欧式 2023-06-05易方达上证科创板50ETF期权 欧式 2023-06-05深圳证券交易所品种 行权…