muduo网络库核心设计为one loop one thread,即一个线程一个事件循环。其中,主Reactor负责监听新连接,并将新连接平均分配到从Reactor中,而从Reactor则对分配到其上的用户进行io交互,接收并处理用户发来的数据,包括消息的回复(实际上,在使用中需要手动设置Reactor线程的数量,如果不设置,muduo默认为单Reactor,即所有的操作都在一个Reactor中)。
多线程Reactor的管理:EventLoopThread和EventLoopThreadPool。
EventLoopThread封装了Thread和EventLoop,对外提供接口在成员函数thread中进行loop循环并由EventLoopThreadPool预先创建统一管理。
Thread
Thread类是muduo自己封装pthread库实现的线程类,可手动控制线程开始的时间和自动管理线生命周期。
class Thread : noncopyable
{
public:typedef std::function<void ()> ThreadFunc;explicit Thread(ThreadFunc, const string& name = string());// FIXME: make it movable in C++11~Thread();void start();int join(); // return pthread_join()bool started() const { return started_; }// pthread_t pthreadId() const { return pthreadId_; }pid_t tid() const { return tid_; }const string& name() const { return name_; }static int numCreated() { return numCreated_.get(); }private:void setDefaultName();bool started_;bool joined_;pthread_t pthreadId_; pid_t tid_;ThreadFunc func_; //要在新线程执行的函数string name_;//CountDownLatch是一种同步工具,通过条件变量和控制其成员变量latch的大小,在多线程编程中实现线程间的同步CountDownLatch latch_; static AtomicInt32 numCreated_;
};//构造函数
Thread::Thread(ThreadFunc func, const string& n): started_(false),joined_(false),pthreadId_(0),tid_(0),func_(std::move(func)),name_(n),latch_(1)
{setDefaultName();
}//析构函数
Thread::~Thread()
{//如果线程开始执行,并且没有同步,则将线程分离if (started_ && !joined_){pthread_detach(pthreadId_);}
}
主要函数
start函数可以开启线程并执行传入的函数,在返回之前确保线程函数已经开始执行
void Thread::start()
{assert(!started_);started_ = true; //将开始标记置为真// FIXME: move(func_)detail::ThreadData* data = new detail::ThreadData(std::move(func_), name_, &tid_, &latch_); //ThreadData封装了线程变量,并提供了同步服务if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)){started_ = false;delete data; // or no delete?LOG_SYSFATAL << "Failed in pthread_create";}else{latch_.wait(); //等待线程函数开始执行assert(tid_ > 0);}
}
线程的同步控制
同步工具CountDownLatch
class CountDownLatch : noncopyable
{
public:explicit CountDownLatch(int count);void wait();void countDown();private:mutable MutexLock mutex_; //守卫锁,同std::lock_guardCondition condition_ GUARDED_BY(mutex_); //条件变量,同std::condition_variableint count_ GUARDED_BY(mutex_); //计数器
};//wait
//只有当计数器小于等于零时,wait才会返回
void CountDownLatch::wait()
{MutexLockGuard lock(mutex_);while (count_ > 0){condition_.wait();}
}//countDown
//计数器减一,如果计数器减一后等于零,则唤醒所有等待的线程
void CountDownLatch::countDown()
{MutexLockGuard lock(mutex_);--count_;if (count_ == 0){condition_.notifyAll();}
}
线程数据结构ThreadData
struct ThreadData
{typedef muduo::Thread::ThreadFunc ThreadFunc;ThreadFunc func_;string name_;pid_t* tid_;CountDownLatch* latch_;ThreadData(ThreadFunc func,const string& name,pid_t* tid,CountDownLatch* latch): func_(std::move(func)),name_(name),tid_(tid),latch_(latch){ }void runInThread(){*tid_ = muduo::CurrentThread::tid();tid_ = NULL;latch_->countDown(); //计数器减一,如果计数器为零,唤醒等待线程,确保线程已经开,并且开始执行latch_ = NULL;//执行函数...}
};void* startThread(void* obj)
{ThreadData* data = static_cast<ThreadData*>(obj);data->runInThread();delete data;return NULL;
}
EventLoopThread
EventLoopThread的设计与Thread类似,都提供了手动开启接口和同步服务
class EventLoopThread : noncopyable
{
public:typedef std::function<void(EventLoop*)> ThreadInitCallback;EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),const string& name = string());~EventLoopThread();EventLoop* startLoop(); //开启loop循环private:void threadFunc();EventLoop* loop_ GUARDED_BY(mutex_); //该线程对应的唯一loopbool exiting_;Thread thread_;MutexLock mutex_;Condition cond_ GUARDED_BY(mutex_);ThreadInitCallback callback_;
};//实现文件
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,const string& name): loop_(NULL),exiting_(false),thread_(std::bind(&EventLoopThread::threadFunc, this), name),mutex_(),cond_(mutex_),callback_(cb)
{
}EventLoopThread::~EventLoopThread()
{exiting_ = true;if (loop_ != NULL) // not 100% race-free, eg. threadFunc could be running callback_.{// still a tiny chance to call destructed object, if threadFunc exits just now.// but when EventLoopThread destructs, usually programming is exiting anyway.loop_->quit();thread_.join();}
}//开启loop循环并返回对应的EventLoop
//通过条件变量,确保loop已经被创建
EventLoop* EventLoopThread::startLoop()
{assert(!thread_.started());thread_.start(); //开启线程,执行threadFunc函数EventLoop* loop = NULL;{MutexLockGuard lock(mutex_);while (loop_ == NULL){cond_.wait();}loop = loop_;}return loop;
}//将loop创建在栈空间上,loop循环确保loop在使用中不会回收,循环结束后自动回收
void EventLoopThread::threadFunc()
{EventLoop loop;if (callback_) {callback_(&loop);}{MutexLockGuard lock(mutex_);loop_ = &loop;cond_.notify();}loop.loop();//assert(exiting_);MutexLockGuard lock(mutex_);loop_ = NULL;
}
EventLoopThreadPool
class EventLoopThreadPool : noncopyable
{
public:typedef std::function<void(EventLoop*)> ThreadInitCallback;EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);~EventLoopThreadPool();//设置线程数void setThreadNum(int numThreads) { numThreads_ = numThreads; }void start(const ThreadInitCallback& cb = ThreadInitCallback());// valid after calling start()/// round-robinEventLoop* getNextLoop();private:EventLoop* baseLoop_;string name_;bool started_;int numThreads_;int next_;std::vector<std::unique_ptr<EventLoopThread>> threads_;std::vector<EventLoop*> loops_;
};//实现
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg): baseLoop_(baseLoop),name_(nameArg),started_(false),numThreads_(0), //线程数默认为零next_(0)
{
}EventLoopThreadPool::~EventLoopThreadPool()
{
// Don't delete loop, it's stack variable
}//开启所有线程即其loop循环
//根据设置的线程数来创建子loop线程(从Reactor,主Reactor需在使用之前手动创建)
//如果没有设置线程数,则只存在一个loop循环,即主Reactor
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{assert(!started_);baseLoop_->assertInLoopThread();started_ = true;for (int i = 0; i < numThreads_; ++i){char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);EventLoopThread* t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));loops_.push_back(t->startLoop());}if (numThreads_ == 0 && cb){cb(baseLoop_);}
}//获取子loop,如果未创建子loop,则返回主loop
EventLoop* EventLoopThreadPool::getNextLoop()
{baseLoop_->assertInLoopThread();assert(started_);EventLoop* loop = baseLoop_;if (!loops_.empty()){// round-robinloop = loops_[next_];++next_;if (implicit_cast<size_t>(next_) >= loops_.size()){next_ = 0;}}return loop;
}