Thread_Pool 项目解析
简介
ThreadPool 是一个轻量级的 C++ 线程池实现,旨在简化多线程编程。
项目分析
我们首先上github的项目地址:https://github.com/progschj/ThreadPool,然后克隆项目到本地。
点开项目的ThrealPool.h文件,查看源码:
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>class ThreadPool {
public:ThreadPool(size_t);template<class F, class... Args>auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();
private:// need to keep track of threads so we can join themstd::vector< std::thread > workers;// the task queuestd::queue< std::function<void()> > tasks;// synchronizationstd::mutex queue_mutex;std::condition_variable condition;bool stop;
};// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads): stop(false)
{for(size_t i = 0;i<threads;++i)workers.emplace_back([this]{for(;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });if(this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});
}// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif(stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){ (*task)(); });}condition.notify_one();return res;
}// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for(std::thread &worker: workers)worker.join();
}#endif
类成员分析
接下来,我们一步一步分析源代码。
在整个文件中只定义一个类ThreadPool,它的类成员有:
std::vector< std::thread > workers;//存储处理任务的线程std::queue< std::function<void()> > tasks;//存储任务的队列std::mutex queue_mutex; // 互斥锁std::condition_variable condition; // 条件变量,和上面的互斥锁保证多线程的同步和互斥bool stop; // 线程池的是否停止的标志
ThreadPool初始化
先上代码:
inline ThreadPool::ThreadPool(size_t threads): stop(false)
{for(size_t i = 0;i<threads;++i)workers.emplace_back([this]{for(;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });if(this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});
}
ThreadPool 的初始化需传入一个参数threads,且将stop赋值为0.
接着往workers里加入threads个线程,每个线程都执行死循环:
for(;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });if(this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}
在循环中,先定义锁,再调用condition.wait()方法,当线程池运行且任务队列为空时,线程堵塞,否则线程继续运行,然后当线程池停止且任务队列为空时,跳出循环,结束线程。否则从取出任务队列的第一个任务,执行任务。
ThreadPool enqueue 加入队列
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif(stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){ (*task)(); });}condition.notify_one();return res;
}
enqueue 方法是模板函数,传入可调用对象F和任意数量的的参数args,,返回一个future对象,返回线程异步操作的结果。
using return_type = typename std::result_of<F(Args...)>::type;
首先,定义返回类型return_type,表示传入的可调用对象的返回值的类型。
auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
程序创建智能指针task,其指向了一个使用bind绑定的可调用对象(该对象调用f,并传入参数args),再使用packaged_task包装成可调用对象。创建智能指针的目的是为了其他线程的使用。
std::future<return_type> res = task->get_future();
{std::unique_lock<std::mutex> lock(queue_mutex);if(stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){ (*task)(); });
}
使用res保存任务线程的异步结果,并作为返回值。
然后在代码块中使用互斥锁加锁,然后将任务加入任务队列中。
最后通知线程池中的一个线程处理任务并返回res。
ThreadPool 析构函数
看注释就可以了:
inline ThreadPool::~ThreadPool()
{{std::unique_lock<std::mutex> lock(queue_mutex); stop = true;//表示线程池停止。}condition.notify_all(); // 通知所有线程for(std::thread &worker: workers)worker.join(); // 等待所有线程结束
}
总结:
ThreadPool 的运行步骤可以分为以下几步:
- 创建ThreadPool对象,传入线程池工作线程数量。在线程池中填加工作线程,并堵塞等待任务线程的通知。
- 调用enqueue方法,传入可调用对象和参数。在该方法中,enqueue先通过一系列操作调整传入的参数,再将其加入任务队列。
- 以上操作完成后,通知线程池中的一个线程处理任务。在线程池中取出任务队列的当前最先进来的任务处理。
- 处理完任务将结果保存到enqueue里的异步返回结果的future对象中,并通过enqueue返回。
- ThreadPool对象被销毁时,将标志stop设置为true,并会通知所有堵塞线程,等待线程池中的所有线程结束。
ThreadPool 实现简单的线程池,使用简单的先进先出策略调度任务,如果可以使用更加复杂的策略,我们可以自己修改代码。