[toc]
线程池
线程池代码分析
thread_pool.c
#include "thread_pool.h"void handler(void *arg)
{printf("[%u] is ended.\n",(unsigned)pthread_self()); //打印自己的进程号pthread_mutex_unlock((pthread_mutex_t *)arg); //解锁
}//线程要执行的任务
void *routine(void *arg)
{//调试#ifdef DEBUGprintf("[%u] is started.\n",(unsigned)pthread_self());#endif//把需要传递给线程任务的参数进行备份thread_pool *pool = (thread_pool *)arg; struct task *p;while(1){/*** push a cleanup functon handler(), make sure that** the calling thread will release the mutex properly** even if it is cancelled during holding the mutex.**** NOTE:** pthread_cleanup_push() is a macro which includes a** loop in it, so if the specified field of codes that ** paired within pthread_cleanup_push() and pthread_** cleanup_pop() use 'break' may NOT break out of the** truely loop but break out of these two macros.** see line 61 below.*///================================================//pthread_cleanup_push(handler, (void *)&pool->lock); //注册清理线程函数pthread_mutex_lock(&pool->lock); //上锁//================================================//// 1, no task, and is NOT shutting down, then waitwhile(pool->waiting_tasks == 0 && !pool->shutdown){ //判断处于等待状态的线程是否等于0,以及判断是否需要销毁线程池,这里是如果false,则进入循环pthread_cond_wait(&pool->cond, &pool->lock); //会将调用线程放入条件变量的等待队列,并释放互斥锁。线程在被唤醒之前会一直阻塞。当线程被唤醒后,它会重新获取互斥锁,然后继续执行}// 2, no task, and is shutting down, then exitif(pool->waiting_tasks == 0 && pool->shutdown == true) //如果需要销毁,则解锁然后线程退出{pthread_mutex_unlock(&pool->lock);pthread_exit(NULL); // CANNOT use 'break';}// 3, have some task, then consume it //链表的头删,表示当有任务链表中有任务时,处于等待队列的线程就会去,执行任务链表中的表头的任务p = pool->task_list->next; //将链表的首结点地址,给到局部变量的Ppool->task_list->next = p->next; //让链表的头结点,指向P->next ,也就是首结点的下一个结点pool->waiting_tasks--; //让等待被线程执行的任务数量减1//================================================//pthread_mutex_unlock(&pool->lock); //解锁pthread_cleanup_pop(0); //不取消清理函数,也不执行//================================================//pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); (p->do_task)(p->arg); //执行链表中的函数pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); free(p); //执行完任务结点后释放局部变量下保存的堆内存地址}pthread_exit(NULL);
}//初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number)
{//初始化互斥锁pthread_mutex_init(&pool->lock, NULL);//初始化条件量pthread_cond_init(&pool->cond, NULL);//销毁标志 pool->shutdown = false; //不销毁//给链表的节点申请堆内存pool->task_list = malloc(sizeof(struct task)); //申请堆内存,用于存储创建出来的线程的IDpool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS); //线程池中最大线程数量= 活跃线程+能够创建的线程数量(既能够存放线程ID的个数) //错误处理,对malloc进行错误处理if(pool->task_list == NULL || pool->tids == NULL){perror("allocate memory error");return false;}//对任务链表中的节点的指针域进行初始化pool->task_list->next = NULL;//设置线程池中线程数量的最大值pool->max_waiting_tasks = MAX_WAITING_TASKS; //方便更改//设置等待线程处理的任务的数量为0,说明现在没有任务pool->waiting_tasks = 0;//设置线程池中活跃的线程的数量pool->active_threads = threads_number;int i;//循环创建活跃线程for(i=0; i<pool->active_threads; i++) {//创建线程 把线程的ID存储在申请的堆内存if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0){perror("create threads error");return false;}//用于调试#ifdef DEBUGprintf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);#endif}return true;
}//先线程池的任务链表中添加任务
bool add_task(thread_pool *pool,void *(*do_task)(void *arg), void *arg) //将自定义的任务,添加到
{//给任务链表节点申请内存struct task *new_task = malloc(sizeof(struct task));if(new_task == NULL){perror("allocate memory error");return false;}new_task->do_task = do_task; //设置需要在链表中添加的任务new_task->arg = arg; //任务函数的参数new_task->next = NULL; //指针域设置为NULL//============ LOCK =============//pthread_mutex_lock(&pool->lock); //上锁防止主线程与任务执行线程,资源竞争//===============================////说明要处理的任务的数量大于能处理的任务数量if(pool->waiting_tasks >= MAX_WAITING_TASKS) {pthread_mutex_unlock(&pool->lock); fprintf(stderr, "too many tasks.\n");free(new_task); return false;}struct task *tmp = pool->task_list; //通过局部变量,对任务链表进行访问,可以保留头结点的位置,减少持有锁的时间,随着函数结束自动释放资源//遍历链表,找到单向链表的尾节点while(tmp->next != NULL)tmp = tmp->next;//把新的要处理的任务插入到链表的尾部 尾插tmp->next = new_task;//要处理的任务的数量+1pool->waiting_tasks++;//=========== UNLOCK ============//pthread_mutex_unlock(&pool->lock);//===============================////调试#ifdef DEBUGprintf("[%u][%s] ==> a new task has been added.\n",(unsigned)pthread_self(), __FUNCTION__);#endif//唤醒第一个处于阻塞队列中的线程pthread_cond_signal(&pool->cond);return true;
}//向线程池加入新线程
int add_thread(thread_pool *pool, unsigned additional_threads)
{//判断需要添加的新线程的数量是否为0if(additional_threads == 0)return 0;//计算线程池中总线程的数量unsigned total_threads =pool->active_threads + additional_threads; //活跃线程的数量+需要添加新线程的数量=总的活跃线程的数量int i, actual_increment = 0; for(i = pool->active_threads;i < total_threads && i < MAX_ACTIVE_THREADS;i++) //判断条件为 活跃线程的数量小于线程池中总线程的数量 且 小于创建线程的上限{ //线程总的活跃线程的数量,是用来判断循环多少次的//创建新线程if(pthread_create(&((pool->tids)[i]), //而超过能够创建的线程数量也一样不会在添加新线程了NULL, routine, (void *)pool) != 0) //为什么不这样写for(i=0 , i<additional_threads, && i < MAX_ACTIVE_THREADS;i++) { //因为创建出来的线程ID,会根据创建时的顺序,存放到对应的数组下标的位置,perror("add threads error"); //所以需要定义两个局部变量,用于保存数组最后的一个元素的后一个位置,因为是i++ ,int i, actual_increment = 0;//actual_increment这个变量用于保存实际创建线程的数量// no threads has been created, return failif(actual_increment == 0) return -1;break;}actual_increment++; #ifdef DEBUGprintf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);#endif}//记录此时线程池中活跃线程的总数pool->active_threads += actual_increment;return actual_increment;
}int remove_thread(thread_pool *pool, unsigned int removing_threads) //删除线程池中的线程。 参数是需要删除的数量
{if(removing_threads == 0)return pool->active_threads; int remaining_threads = pool->active_threads - removing_threads; //计算线程池中剩余的线程数量 remaining_threads = remaining_threads > 0 ? remaining_threads : 1; //防呆,如果想要删除的线程数量大于线程池中线程的数量则 //保留初始化时数组下标为0的线程,如果不保留,就是销毁线程池了int i; //在删除线程池中的线程时,保留至少一个线程的原因是为了确保线程池仍然能够处理剩余的任务,避免完全停止服务。for(i=pool->active_threads-1; i>remaining_threads-1; i--) //pool->active_threads-1即数组中最后一个元素下标{errno = pthread_cancel(pool->tids[i]); // 取消线程 同时 执行线程清理函数,主线程调用,终止其他线程if(errno != 0)break;#ifdef DEBUGprintf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);#endif}if(i == pool->active_threads-1) return -1;else{pool->active_threads = i+1;return i+1;}
}bool destroy_pool(thread_pool *pool) //销毁线程池
{// 1, activate all threadspool->shutdown = true;pthread_cond_broadcast(&pool->cond); //唤醒线程池中所有线程,然后终止线程// 2, wait for their exitingint i;for(i=0; i<pool->active_threads; i++) {errno = pthread_join(pool->tids[i], NULL); //主线程调用,循环等待回收线程资源if(errno != 0){printf("join tids[%d] error: %s\n",i, strerror(errno));}elseprintf("[%u] is joined\n", (unsigned)pool->tids[i]);}// 3, free memoriesfree(pool->task_list); //释放线程池中任务链表的堆内存free(pool->tids); //释放线程句柄数组的堆内存free(pool); //释放管理线程池的堆内存return true;
}
thread_pool.h
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>#include <errno.h>
#include <pthread.h>#define MAX_WAITING_TASKS 1000 //处于等待状态的线程数量最大为1000
#define MAX_ACTIVE_THREADS 20 //活跃的线程数量 //任务结点 单向链表的节点,类型
struct task
{void *(*do_task)(void *arg); //任务函数指针 指向线程要执行的任务 格式是固定的void *arg; //需要传递给任务的参数,如果不需要,则NULLstruct task *next; //指向下一个任务结点的指针
};//线程池的管理结构体
typedef struct thread_pool
{pthread_mutex_t lock; // 互斥锁pthread_cond_t cond; // 条件量bool shutdown; //是否需要销毁线程池struct task *task_list; //用于存储任务的链表pthread_t *tids; //用于记录线程池中线程的ID,其实是一个数组,数组中元素的类型是pthread_tunsigned max_waiting_tasks; //线程池中线程的数量最大值unsigned waiting_tasks; //设置等待线程处理的任务的数量unsigned active_threads; //正在活跃的线程数量
}thread_pool;//初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number);//向线程池中添加任务
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);//先线程池中添加线程
int add_thread(thread_pool *pool, unsigned int additional_threads_number);//从线程池中删除线程
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);//销毁线程池
bool destroy_pool(thread_pool *pool);//任务函数
void *routine(void *arg);#endif
main.c
#include "thread_pool.h"// 任务函数:模拟一个需要耗时的任务
void *mytask(void *arg)
{int n = (int)arg;printf("[%u][%s] ==> job will be done in %d sec...\n",(unsigned)pthread_self(), __FUNCTION__, n);sleep(n); // 模拟任务耗时printf("[%u][%s] ==> job done!\n",(unsigned)pthread_self(), __FUNCTION__);return NULL;
}// 计时函数:每秒打印一次经过的秒数
void *count_time(void *arg)
{int i = 0;while(1){sleep(1);printf("sec: %d\n", ++i);}
}int main(void)
{pthread_t a;// 创建计时线程pthread_create(&a, NULL, count_time, NULL);// 1. 初始化线程池thread_pool *pool = malloc(sizeof(thread_pool));init_pool(pool, 2); // 初始化线程池,初始线程数为2// 2. 添加任务到线程池printf("throwing 3 tasks...\n");add_task(pool, mytask, (void *)(rand()%10));add_task(pool, mytask, (void *)(rand()%10));add_task(pool, mytask, (void *)(rand()%10));// 3. 检查当前活动线程数printf("current thread number: %d\n",remove_thread(pool, 0)); // 传入0表示不移除线程,只返回当前线程数sleep(9); // 等待9秒// 4. 添加更多任务到线程池printf("throwing another 2 tasks...\n");add_task(pool, mytask, (void *)(rand()%10));add_task(pool, mytask, (void *)(rand()%10));// 5. 添加更多线程到线程池add_thread(pool, 2); // 增加2个线程sleep(5); // 等待5秒// 6. 从线程池中移除线程printf("remove 3 threads from the pool, ""current thread number: %d\n",remove_thread(pool, 3)); // 移除3个线程,并打印当前线程数// 7. 销毁线程池destroy_pool(pool); // 销毁线程池return 0; // 主函数返回0,程序结束
}
线程取消的基本概念
取消点(Cancellation Point):线程在执行过程中,会在一些特定的函数调用时检查是否有取消请求,这些函数称为取消点。例如,
pthread_testcancel
、pthread_join
、pthread_cond_wait
等函数都是取消点。取消类型(Cancellation Type):决定线程在取消点如何响应取消请求。主要有两种类型:
- 异步取消(Asynchronous Cancellation):线程可以在任何时刻被取消。
- 延迟取消(Deferred Cancellation):线程只有在到达取消点时才会被取消。POSIX 线程库默认使用这种类型。
取消状态(Cancellation State):决定线程是否响应取消请求。可以是以下两种状态:
- 启用(Enable):线程会响应取消请求。
- 禁用(Disable):线程不会响应取消请求。
pthread_cleanup_push
是 POSIX 线程库(pthread)中的一个函数,用于在线程取消时执行清理操作。它与pthread_cleanup_pop
配对使用,确保在线程退出或被取消时执行特定的清理代码,例如释放资源或解锁互斥锁。pthread_cleanup_push(cleanup, "Resource A")
:注册清理函数cleanup
,当线程被取消或退出时,会执行cleanup("Resource A")
。pthread_cleanup_pop(1)
:取消清理函数并执行它(参数1
表示执行清理函数)。pthread_testcancel()
:这是一个取消点,线程在这里检查是否有取消请求。
pthread_cleanup_push
和pthread_cleanup_pop
用于确保线程在被取消或正常退出时执行特定的清理操作。这对于管理资源(如内存、文件描述符、互斥锁等)非常重要,确保不会因为线程的意外终止而导致资源泄漏。
线程取消(Thread Cancellation)是指在多线程编程中,允许一个线程请求终止另一个线程的执行。POSIX 线程库提供了这种机制,使得线程可以被其他线程取消。这在某些情况下非常有用,例如当一个线程因为某种原因需要提前终止另一个线程的执行时。
相关函数
pthread_cancel(pthread_t thread)
: 请求取消指定的线程。pthread_setcancelstate(int state, int *oldstate)
: 设置线程的取消状态。pthread_setcanceltype(int type, int *oldtype)
: 设置线程的取消类型。pthread_testcancel()
: 创建一个取消点,线程在执行到这里时会检查是否有取消请求。
通过线程池去管理线程:重点在于用条件变量,以及线性表(数组)保存线程的ID(句柄)
通过链表去管理任务: 尾插,头删的方式
阅读代码的时候需要结合上下文
布尔类型一般常用来判断是和否(即二进制可作为标志)
函数指针:用来指向函数的指针 void *(*do_task)(void *arg);
释放堆内存时,包括结构体里的指针的堆内存分配
整体的架构应该是:
堵塞队列:线程1 , 线程2 ,线程3 .......
任务链表: 任务1 , 任务2 ,任务3 .........
pthread_线程1{
上锁
while{
条件量
}
if{判断标志位
销毁线程池
}
pool->taks_list->next = p->next(头删)
(p->task_任务1)(p->arg)
解锁
free(p)
}