目录
线程池的概念
线程池的优点
线程池的应用场景
线程池示例
代码实现
线程池的概念
线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
线程池的优点
- 提高响应速度:避免了在处理短时间任务时创建与销毁线程的代价。当任务到达时,线程池中的线程已经创建并准备就绪,可以不用等待线程创建而直接执行任务,加快任务的执行速度与时间。
- 降低资源消耗:线程池不仅能够保证内核的充分利用,还能防止过分调度。通过重复利用已创建的线程,可以降低线程创建和销毁造成的消耗。
- 提高线程的可管理性:线程池可以进行统一的分配、调优和监控,有助于避免系统资源的无限制消耗,提高系统的稳定性。
注意:可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景
- 需要大量的线程来完成任务,且完成任务的时间比较短。WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。
线程池示例
- 创建固定数量线程池,循环从任务队列中获取任务对象。
- 获取到任务对象后,执行任务对象中的任务接口。
- 线程池对外提供一个Push接口,用于让外部线程能够将任务Push到任务队列当中。
线程池通过一个线程安全的阻塞任务队列加上一个或一个以上的线程实现,线程池中的线程可以从阻塞队列中获取任务进行任务处理,当线程都处于繁忙状态时可以将任务加入阻塞队列中,等到其它的线程空闲后进行处理。
代码实现
ThreadPool.hpp
#pragma once#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <pthread.h>
#include <unistd.h>using namespace std;struct ThreadInfo
{pthread_t tid;string name;
};static const int defaultnum = 5;//线程池
template<class T>
class ThreadPool
{
public:void Lock(){pthread_mutex_lock(&mutex_);}void UnLock(){pthread_mutex_unlock(&mutex_);}void Wakeup(){pthread_cond_signal(&cond_);}void ThreadSleep(){pthread_cond_wait(&cond_,&mutex_);}bool IsQueueEmpty(){return task_.empty();}string GetThreadName(pthread_t tid){int num = threads_.size();for(int i = 0;i < num;++i){if(threads_[i].tid == tid)return threads_[i].name;}return "none";}
public:ThreadPool(int num = defaultnum):threads_(num){pthread_mutex_init(&mutex_,nullptr);pthread_cond_init(&cond_,nullptr); }//线程池中线程的执行例程static void* HandlerTask(void* args){ThreadPool<T>* tp = static_cast<ThreadPool<T> *>(args);//?string name = tp->GetThreadName(pthread_self());//不断从任务队列获取任务进行处理while (true){//消费任务tp->Lock();while (tp->IsQueueEmpty())//任务队列为空{tp->ThreadSleep();}T t = tp->Pop();tp->UnLock();t();//处理任务cout << name << "run," << "result:" << t.GetResult() <<endl;}}void Start(){int num = threads_.size();for(int i = 0;i < num;++i){threads_[i].name = "thread-" + to_string(i);//pthread_create要求HandlerTask函数是void*返回值,参数void*类型,如果HandlerTask函数直接定义在类里面,那么参数还会有一个隐藏的this指针,类型就不匹配了,编译会出现错误,所以我们将HandlerTask定义为static函数,然后再将this指针通过pthread_create参数传入进去,就可以避免出现这样的问题pthread_create(&(threads_[i].tid),nullptr,HandlerTask,this);}}//往任务队列塞任务(主线程调用)void Push(const T&t){Lock();task_.push(t);Wakeup();UnLock();}//从任务队列获取任务(线程池中的线程调用)T Pop(){T t = task_.front();task_.pop();return t;}~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}
private:vector<ThreadInfo> threads_;//存放线程queue<T> task_;//创建任务队列pthread_mutex_t mutex_;pthread_cond_t cond_;
};
注意:
- 当某线程被唤醒时,其可能是被异常或是伪唤醒,或者是一些广播类的唤醒线程操作而导致所有线程被唤醒,使得在被唤醒的若干线程中,只有个别线程能拿到任务。此时应该让被唤醒的线程再次判断是否满足被唤醒条件,所以在判断任务队列是否为空时,应该使用while进行判断,而不是if。
- pthread_cond_broadcast函数的作用是唤醒条件变量下的所有线程,而外部可能只Push了一个任务,我们却把全部在等待的线程都唤醒了,此时这些线程就都会去任务队列获取任务,但最终只有一个线程能得到任务。一瞬间唤醒大量的线程可能会导致系统震荡,这叫做惊群效应。因此在唤醒线程时最好使用pthread_cond_signal函数唤醒正在等待的一个线程即可。
- 当线程从任务队列中拿到任务后,该任务就已经属于当前线程了,与其他线程已经没有关系了,因此应该在解锁之后再进行处理任务,而不是在解锁之前进行。因为处理任务的过程可能会耗费一定的时间,所以我们不要将其放到临界区当中。
- 如果将处理任务的过程放到临界区当中,那么当某一线程从任务队列拿到任务后,其他线程还需要等待该线程将任务处理完后,才有机会进入临界区。此时虽然是线程池,但最终我们可能并没有让多线程并行的执行起来。
为什么线程池中的线程执行例程HandlerTask函数需要设置为静态方法?
- 使用pthread_create函数创建线程时,需要为创建的线程传入一个Routine(执行例程),pthread_create要求执行例程函数是void*返回值,参数void*类型。
- 而此时HandlerTask作为类的成员函数,该函数的第一个参数是隐藏的this指针,因此这里的HandlerTask函数,虽然看起来只有一个参数,而实际上它有两个参数,此时直接将该HandlerTask函数作为创建线程时的执行例程会因为参数类型不匹配而无法通过编译。
静态成员函数属于类,而不属于某个对象,也就是说静态成员函数是没有隐藏的this指针的,因此我们需要将HandlerTask设置为静态方法,此时HandlerTask函数才真正只有一个参数类型为void*的参数。
但是在静态成员函数内部无法调用非静态成员函数,而我们需要在HandlerTask函数当中调用该类的某些非静态成员函数,比如Pop。因此我们需要在创建线程时,向HandlerTask函数传入的当前对象的this指针,此时我们就能够通过该this指针在HandlerTask函数内部调用非静态成员函数了。
为什么线程池中需要有互斥锁和条件变量?
- 线程池中的任务队列是会被多个执行流同时访问的临界资源,因此我们需要引入互斥锁对任务队列进行保护。
- 线程池当中的线程要从任务队列里拿任务,前提条件是任务队列中必须要有任务,因此线程池当中的线程在拿任务之前,需要先判断任务队列当中是否有任务,若此时任务队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒,因此我们需要引入条件变量。
- 当外部线程向任务队列中Push一个任务后,此时可能有线程正处于等待状态,因此在新增任务后需要唤醒在条件变量下等待的线程。
任务类型设计Task.hpp
我们将线程池进行了模板化,因此线程池当中存储的任务类型可以是任意的,但无论该任务是什么类型的,在该任务类当中都必须包含一个Run方法,当我们处理该类型的任务时只需调用该Run方法即可。这样,如果需要处理不同类型的任务,我们只需创建对应的新任务类,并在其中定义相应的方法即可。线程池会自动识别并调用适当的方法来处理任务。
下面我们实现一个计算任务类:
#pragma once
#include <iostream>
#include <string>using namespace std;string opers = "+-*/%";enum{DivZero = 1,ModZero,Unknown
};class Task
{
public:Task(){}Task(int data1,int data2,char oper):data1_(data1),data2_(data2),oper_(oper),result_(0),exitcode_(0){}void run(){switch (oper_){case '+':result_ = data1_ + data2_;break;case '-':result_ = data1_ - data2_;break;case '*':result_ = data1_ * data2_;break;case '/':{if(data2_ == 0) exitcode_ = DivZero;else result_ = data1_ / data2_; }break;case '%':{if(data2_ == 0) exitcode_ = ModZero ;else result_ = data1_ % data2_; }break;default:exitcode_ = Unknown;break;}}void operator()(){run();}string GetResult(){string r = to_string(data1_);r += oper_;r += to_string(data2_);r += "=";r += to_string(result_);r += "[code: ";r += to_string(exitcode_);r += "]";return r;}string GetTask(){string r = to_string(data1_);r += oper_;r += to_string(data2_);r += "=?";return r;}~Task(){}
private:int data1_;int data2_;char oper_;int result_;int exitcode_;
};
此时线程池内的线程不断从任务队列拿出任务进行处理,而它们并不需要关心这些任务是哪来的,它们只需要拿到任务后执行对应的Run方法即可。
主线程逻辑:main.cpp
主线程就负责不断的构建任务,然后向任务队列当中Push任务就行了,此后线程池当中的线程会从任务队列当中获取到这些任务并进行处理。
#include <iostream>
#include <ctime>
#include "ThreadPool.hpp"
#include "Task.hpp"using namespace std;int main()
{ThreadPool<Task> *tp = new ThreadPool<Task>(5);tp->Start();srand(time(nullptr)^getpid());while (true){//1.构建任务int x = rand()%10+1;usleep(10);int y = rand()%5;char oper = opers[rand()%opers.size()];Task t(x,y,oper);tp->Push(t);//2.交给线程池处理任务cout << "main thread make task: " << t.GetTask() << endl;sleep(1);}return 0;
}
运行结果:
我们通过监控脚本查看线程的运行状态:
while :; do ps -aL | head -1 && ps -aL | grep ThreadPoolTest; sleep 1; done
- 从监控脚本可以看到程序运行后有六个线程,其中一个为主线程。
- 我们会发现这五个线程在处理时会呈现出一定的顺序性,因为主线程是每秒Push一个任务,这五个线程只会有一个线程获取到该任务,其他线程都会在等待队列中进行等待,当该线程处理完任务后就会因为任务队列为空而排到等待队列的最后,当主线程再次Push一个任务后会通过pthread_cond_signal唤醒等待队列首部的一个线程,这个线程处理完任务后又会排到等待队列的最后,因此这五个线程在处理任务时会呈现出一定的顺序性。
线程安全的单例模式
什么是单例模式
单例模式是一种 "经典的, 常用的, 常考的" 设计模式。
什么是设计模式
IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是设计模式。
单例模式的特点
某些类, 只应该具有一个对象(实例), 就称之为单例。例如一个男人只能有一个媳妇。
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据。
饿汉实现方式和懒汉实现方式
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.
懒汉方式最核心的思想是 "延时加载"。从而能够优化服务器的启动速度.
饿汉方式实现单例模式
template <typename T>
class Singleton {static T data;
public:static T* GetInstance() {return &data;}
};
只要通过 Singleton 这个包装类来使用 T 对象,则一个进程中只有一个 T 对象的实例。
懒汉方式实现单例模式
template <typename T>
class Singleton {static T* inst;
public:static T* GetInstance() {if (inst == NULL) {inst = new T();}return inst;}
};
存在一个严重的问题, 线程不安全。
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例。
但是后续再次调用, 就没有问题了。
懒汉方式实现单例模式(线程安全版本)
// 懒汉模式, 线程安全
template <typename T>
class Singleton {volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.static std::mutex lock;
public:static T* GetInstance() {if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能.lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.if (inst == NULL) {inst = new T();}lock.unlock();}return inst;}
};
注意事项:
- 加锁解锁的位置
- 双重 if 判定, 避免不必要的锁竞争
- volatile关键字防止过度优化
单例模式的线程池
ThreadPool.hpp
#pragma once#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <pthread.h>
#include <unistd.h>using namespace std;struct ThreadInfo
{pthread_t tid;string name;
};static const int defaultnum = 5;//线程池
template<class T>
class ThreadPool
{
public:void Lock(){pthread_mutex_lock(&mutex_);}void UnLock(){pthread_mutex_unlock(&mutex_);}void Wakeup(){pthread_cond_signal(&cond_);}void ThreadSleep(){pthread_cond_wait(&cond_,&mutex_);}bool IsQueueEmpty(){return task_.empty();}string GetThreadName(pthread_t tid){int num = threads_.size();for(int i = 0;i < num;++i){if(threads_[i].tid == tid)return threads_[i].name;}return "none";}
public://线程池中线程的执行例程static void* HandlerTask(void* args){ThreadPool<T>* tp = static_cast<ThreadPool<T> *>(args);//?string name = tp->GetThreadName(pthread_self());//不断从任务队列获取任务进行处理while (true){//消费任务tp->Lock();while (tp->IsQueueEmpty())//任务队列为空{tp->ThreadSleep();}T t = tp->Pop();tp->UnLock();t();//处理任务cout << name << "run," << "result:" << t.GetResult() <<endl;}}void Start(){int num = threads_.size();for(int i = 0;i < num;++i){threads_[i].name = "thread-" + to_string(i);//pthread_create要求HandlerTask函数是void*返回值,参数void*类型,如果HandlerTask函数直接定义在类里面,那么参数还会有一个隐藏的this//指针,类型就不匹配了,编译会出现错误,所以我们将HandlerTask定义为static函数,然后再将this指针通过pthread_create参数传入进去,就可以避免出现这样的问题pthread_create(&(threads_[i].tid),nullptr,HandlerTask,this);}}//往任务队列塞任务(主线程调用)void Push(const T&t){Lock();task_.push(t);Wakeup();UnLock();}//从任务队列获取任务(线程池中的线程调用)T Pop(){T t = task_.front();task_.pop();return t;}static ThreadPool<T>* GetInstance(){if(tp_ == nullptr)//tp_被创建了,就直接返回tp_,不需要让其他线程再去申请锁{pthread_mutex_lock(&lock_);//防止多个线程获取单例,加上锁防止被new多次if(tp_ == nullptr){cout << "log: singleton create done first!" << endl;tp_ = new ThreadPool<T>();}pthread_mutex_unlock(&lock_);}return tp_;}private:ThreadPool(int num = defaultnum):threads_(num){pthread_mutex_init(&mutex_,nullptr);pthread_cond_init(&cond_,nullptr); }~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}ThreadPool(const ThreadPool<T> &) = delete;const ThreadPool<T> &operator = (const ThreadPool<T> &) = delete;
private:vector<ThreadInfo> threads_;//存放线程queue<T> task_;//创建任务队列pthread_mutex_t mutex_;pthread_cond_t cond_;static ThreadPool<T> *tp_;static pthread_mutex_t lock_;
};template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
代码说明:
成员变量
- static ThreadPool<T> *tp_:静态指针,用于单例模式。
- static pthread_mutex_t lock_:静态互斥锁,用于保证单例模式的线程安全。
构造函数和析构函数
- ThreadPool(int num = defaultnum):构造函数,接收一个整数参数(线程数量),初始化线程和互斥锁、条件变量。放在private,防止类外构造。
- ~ThreadPool():析构函数,销毁互斥锁和条件变量。
私有成员函数
- ThreadPool(const ThreadPool<T> &) = delete; 和 const ThreadPool<T> &operator = (const ThreadPool<T> &) = delete;:删除拷贝构造函数和拷贝赋值运算符,防止类的拷贝。
GetInstance函数
- static ThreadPool<T>* GetInstance():获取线程池实例的静态函数。使用单例模式,确保只有一个线程池实例存在。
- 如果tp_为nullptr(表示线程池还未创建),则加锁并检查tp_是否仍为nullptr。如果仍然为nullptr,则创建一个新的ThreadPool实例并赋值给tp_。这一步防止了多个线程同时创建多个线程池实例的情况。
- 返回线程池实例。
代码总结
这个ThreadPool类是一个线程池的实现,使用了单例模式和互斥锁、条件变量来实现线程安全和线程池的管理。通过GetInstance函数可以获取线程池的实例,这个函数保证了线程池的唯一性,并且通过互斥锁来防止多个线程同时创建多个线程池实例的情况。
main.cpp
#include <iostream>
#include <ctime>
#include "ThreadPool.hpp"
#include "Task.hpp"using namespace std;int main()
{// 如果获取单例对象的时候,也是多线程获取的呢?std::cout << "process runn..." << std::endl;sleep(3);ThreadPool<Task>::GetInstance()->Start();srand(time(nullptr)^getpid());while (true){//1.构建任务int x = rand()%10+1;usleep(10);int y = rand()%5;char oper = opers[rand()%opers.size()];Task t(x,y,oper);ThreadPool<Task>::GetInstance()->Push(t);//2.交给线程池处理任务cout << "main thread make task: " << t.GetTask() << endl;sleep(1);}return 0;
}
运行结果:
我们让主线程先运行,3秒后再启动线程池,主线程往线程池的任务队列push数据。通过监控脚本可以看到一开始只有主线程,3秒后线程池启动,线程就被创建出来。并且按照一定的顺序往任务队列里面拿任务并进行处理任务。