线程池(Linux +C)

参考  手写线程池 - C语言版 | 爱编程的大丙 (subingwen.cn)

目录

1.为什么需要线程池?

1)线程问题:

2)如何解决线程问题(线程池的优势):

2.线程池是什么?

1)线程的基本组成:

(1)任务队列(负责保存要处理的任务,并将任务交给工作线程去处理)

(2)工作者线程(任务队列的消费者、执行人员,动态创建N个)

(3)管理者线程(管理整个线程池,1个)

3.线程池怎么实现?(linux/C语言版本)

1)单个任务元素结构体

2)线程池结构体(任务队列+管理线程+工作线程)

3)API声明

4)创建线程池

(1)主要操作:

(2)说明:

5)工作线程执行函数worker

(1)主要操作:

(2)说明:

6)管理者任务

(1)主要操作:

(2)说明:

7)单线程退出函数

(1)主要操作:

(2)说明

8)线程池添加函数

(1)主要操作:

(2)说明:

9)获取线程池中工作的线程个数、存活的线程个数

10)线程销毁函数

(1)主要操作:

(2)说明:

4.线程池怎么用?


1.为什么需要线程池?

1)线程问题:

(1)如果只使用线程创建函数,在不断有新的任务进来的时候,需要不断的创建任务;任务在结束之后,为了避免占用资源,需要销毁线程任务。导致频繁操作,占用程序运行时间。

(2)在线程创建之后,如果任务已经运行完毕,线程不去销毁,则会白白浪费资源。

2)如何解决线程问题(线程池的优势):

(1)使用线程池之后,将任务和线程分离,建立一定数量的线程,使线程重复利用(不销毁),不断将任务添加到线程中。解决线程频繁创建、销毁问题。提供系统执行效率。

(2)根据任务数量增减,自动添加或者减少线程,使得线程维持在最优数量,节约系统资源。

2.线程池是什么?

1)线程的基本组成:

(1)任务队列(负责保存要处理的任务,并将任务交给工作线程去处理)

(a)通过线程池提供的api函数,将待处理的任务添加到任务队列,或者从任务队列删除;

(b)已经处理的任务会被从任务队列删除;

(c)线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程

(2)工作者线程(任务队列的消费者、执行人员,动态创建N个)

(a)线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理;

(b)工作的线程相当于是任务队列的消费者角色;

(c)如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞);

(d)如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作;

(3)管理者线程(管理整个线程池,1个)

(a)周期性地 对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测;

(b)根据任务数量的多少,增减线程数量;

3.线程池怎么实现?(linux/C语言版本)

1)单个任务元素结构体

// 任务结构体
typedef struct Task
{void (*function)(void* arg);void* arg;
}Task;

2)线程池结构体(任务队列+管理线程+工作线程)

// 线程池结构体struct ThreadPool
{Task* taskQ;        // 任务队列  后面利用堆新建数组int queueCapacity;  // 容量int queueSize;      // 当前任务个数int queueFront;     // 队头 -> 取数据int queueRear;      // 队尾 -> 放数据pthread_t managerID;    // 管理者线程IDpthread_t *threadIDs;   // 工作的线程ID   后面利用堆新建数组int minNum;             // 最小线程数量int maxNum;             // 最大线程数量int busyNum;            // 忙的线程的个数int liveNum;            // 存活的线程的个数int exitNum;            // 要销毁的线程个数pthread_mutex_t mutexPool;  // 锁整个的线程池pthread_mutex_t mutexBusy;  // 锁busyNum变量pthread_cond_t notFull;     // 任务队列是不是满了pthread_cond_t notEmpty;    // 任务队列是不是空了int shutdown;           // 是不是要销毁线程池, 销毁为1, 不销毁为0
};

3)API声明

typedef struct ThreadPool ThreadPool;// 创建线程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queueSize);// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);//
// 工作的线程(消费者线程)任务函数
void* worker(void* arg);
// 管理者线程任务函数
void* manager(void* arg);
// 单个线程退出
void threadExit(ThreadPool* pool);

4)创建线程池

(1)主要操作:

(a)创建线程池对象:ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));

(b)创建工作者线程队列(pthread_t):只是为了便于管理线程,并没有创建线程。

        pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);

(c)创建任务队列(task类型):pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);

(d)创建管理者线程:pthread_create(&pool->managerID, NULL, manager, pool);

(e)创建工作者线程:按照最小数量minNum进行for循环;
           pthread_create(&pool->threadIDs[i], NULL, worker, pool);

(f)初始化互斥量/条件变量:mutexPool、mutexBusy、notEmpty、notFull

(g)其余工作:初始化一些控制变量

(h)鲁棒性工作:检查指针,及时释放内存

(2)说明:

(a)为什么使用do while ?可以使用break,最后统一销毁资源;

(b)memset,后续根据指针是不是0 判断是不是有线程闲置

ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{//实例化线程池对象ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));do {//判断pool有没有指向有效内存if (pool == NULL){printf("malloc threadpool fail...\n");break;}//创建工作者线程队列,按照最多线程数量 创建pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);//判断threadIDs有没有指向有效内存if (pool->threadIDs == NULL){printf("malloc threadIDs fail...\n");break;}memset(pool->threadIDs, 0, sizeof(pthread_t) * max);//初始化相关参数pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;pool->liveNum = min;    // 和最小个数相等pool->exitNum = 0;//创建互斥量、条件变量if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0){printf("mutex or condition init fail...\n");break;}创建任务队列pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);pool->queueCapacity = queueSize;pool->queueSize = 0;pool->queueFront = 0;pool->queueRear = 0;pool->shutdown = 0;//创建管理者线程pthread_create(&pool->managerID, NULL, manager, pool);//创建工作者线程for (int i = 0; i < min; ++i){pthread_create(&pool->threadIDs[i], NULL, worker, pool);}return pool;} while (0);// 释放资源if (pool && pool->threadIDs) free(pool->threadIDs);if (pool && pool->taskQ) free(pool->taskQ);if (pool) free(pool);return NULL;
}

5)工作线程执行函数worker

(1)主要操作:

(a)判断任务队列是否为空。假如为空,阻塞线程。并进一步判断,是否要销毁线程池。

(b)如果任务队列不为空,取出任务,执行任务回调函数。

(2)说明:

(a)线程池属于公共资源,在访问线程池的时候要加锁;访问结束之后解锁。

(b)while (pool->queueSize == 0 && !pool->shutdown)  。在pthread_cond_wait这个地方,为什么采用while而不是if。关于该点,进行解释。

        首先,我们需要了解pthread_cond_wait的执行流程:

        -》pthread_cond_wait()函数会将当前线程挂起,使它处于休眠状态。
        -》在当前线程被挂起之前,函数会自动调用pthread_mutex_lock()函数将关联的互斥锁上锁,保证条件变量的独占访问。
        -》函数会将当前线程加入条件变量的等待队列中,并释放关联的互斥锁。
        -》当前线程进入休眠状态,等待其他线程在该条件变量上发出信号或广播。
        -》当另一个线程在该条件变量上发出信号或广播时,pthread_cond_wait()函数会唤醒一个等待在该条件变量上的线程。
        -》唤醒的线程重新获得关联的互斥锁的所有权,并继续执行。
        -》pthread_cond_wait()函数返回。

        其次,我们需要了解pthread_cond_signal的作用:至少唤醒一个线程。

        综合上面两点,设想以下场景。

初始临界变量x = 0;线程1:如果x< 1,阻塞。否则,继续执行,并设置x = 0;线程2:如果x< 1,阻塞。否则,继续执行,并设置x = 0;线程3:设置x = 1,调用pthread_cond_signal唤醒线程。

        情况1,采用if语句:线程1收到了线程唤醒,线程2收到了线程唤醒,线程1抢到了互斥锁,他正常进行,并把x设置为0;然后线程2因为收到了线程唤醒,他也开始执行。问题出现了,线程1不应该执行因为他的判断条件是 x<1的情况下,应该进行阻塞。可是他却开始执行了。

        情况2,采用while语句:线程1收到了线程唤醒,线程2收到了线程唤醒,线程1抢到了互斥锁,因为之前被阻塞了,while语句不满足,他又检查了一次while条件,发现不用阻塞了,他正常进行,并把x设置为0。然后线程2因为收到了线程唤醒,他也开始执行。因为之前也被阻塞了,while语句不满足,他有检查一次条件,发现x被修改了,x=0,需要被阻塞了。所以他没法往下执行了。所以接着被阻塞了。这种情况,就可以保证,条件变量只能唤醒一个线程。防止“伪唤醒”。

        回到本例子中,情况一样。线程池会唤醒多个工作线程,让他们去执行任务。但是有可能任务数<线程数。如果使用while循环,就能保证每个任务对应一个线程。如果使用if判断,有可能唤醒多余的线程,并引发程序崩溃。

void* worker(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (1){pthread_mutex_lock(&pool->mutexPool);// 当前任务队列是否为空while (pool->queueSize == 0 && !pool->shutdown){// 阻塞工作线程pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);// 判断是不是要销毁线程if (pool->exitNum > 0){pool->exitNum--;if (pool->liveNum > pool->minNum){pool->liveNum--;pthread_mutex_unlock(&pool->mutexPool);//线程退出函数。threadExit(pool);}}}// 判断线程池是否被关闭了if (pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}// 从任务队列中取出一个任务Task task;task.function = pool->taskQ[pool->queueFront].function;task.arg = pool->taskQ[pool->queueFront].arg;// 移动头结点 数组变成了循环队列pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;pool->queueSize--;//告诉生产者,可以添加任务了pthread_cond_signal(&pool->notFull);//解锁pthread_mutex_unlock(&pool->mutexPool);//工作线程数量+1printf("thread %ld start working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);//函数任务调用task.function(task.arg);//释放函数堆内存free(task.arg);task.arg = NULL;//工作线程数量-1printf("thread %ld end working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}return NULL;
}

6)管理者任务

(1)主要操作:

(a)添加任务线程;

(b)销毁任务线程;

(2)说明:

销毁任务线程的思路:发出条件变量信号,唤醒所有的工作线程;并将shutdown参数,告诉工作线程,现在要关闭。让所有的线程自杀。

void* manager(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (!pool->shutdown){// 每隔3s检测一次sleep(3);// 取出线程池中任务的数量和当前线程的数量pthread_mutex_lock(&pool->mutexPool);int queueSize = pool->queueSize;int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);// 取出忙的线程的数量pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);// 添加线程// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数if (queueSize > liveNum && liveNum < pool->maxNum){pthread_mutex_lock(&pool->mutexPool);int counter = 0;for (int i = 0; i < pool->maxNum && counter < NUMBER&& pool->liveNum < pool->maxNum; ++i){if (pool->threadIDs[i] == 0){pthread_create(&pool->threadIDs[i], NULL, worker, pool);counter++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}// 销毁线程// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数if (busyNum * 2 < liveNum && liveNum > pool->minNum){pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;pthread_mutex_unlock(&pool->mutexPool);// 让工作的线程自杀for (int i = 0; i < NUMBER; ++i){pthread_cond_signal(&pool->notEmpty);}}}return NULL;
}

7)单线程退出函数

(1)主要操作:

前面判断线程是不是在工作,主要是判断线程ID是不是空。如果线程ID是空,表明为空线程;如果线程不为空,则为忙线程。因此,在退出删除线程的时候,需要修改线程队列中对应的线程ID,将其改为ID=0;便于后面的继续使用。

(2)说明

void threadExit(ThreadPool* pool)
{pthread_t tid = pthread_self();for (int i = 0; i < pool->maxNum; ++i){if (pool->threadIDs[i] == tid){pool->threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);
}

8)线程池添加函数

(1)主要操作:

(a)如果线程池中的工作线程全部被占用,阻塞添加任务,即阻塞生产者(该线程就是调用线程池的线程,一般是主线程);
(b)如果工作线程释放了notFull条件变量,则说明工作线程有空余,唤醒线程添加函数。

(2)说明:

void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{pthread_mutex_lock(&pool->mutexPool);while (pool->queueSize == pool->queueCapacity && !pool->shutdown){// 阻塞生产者线程pthread_cond_wait(&pool->notFull, &pool->mutexPool);}if (pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);return;}// 添加任务pool->taskQ[pool->queueRear].function = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pool->queueSize++;pthread_cond_signal(&pool->notEmpty);pthread_mutex_unlock(&pool->mutexPool);
}

9)获取线程池中工作的线程个数、存活的线程个数

int threadPoolBusyNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}int threadPoolAliveNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexPool);int aliveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);return aliveNum;
}

10)线程销毁函数

(1)主要操作:

        赋值操作pool->shutdown = 1;此时管理者线程就会检测到,退出while循环;

        阻塞回收管理者线程。pthread_join(pool->managerID, NULL);

        唤醒所有的消费者线程。让消费者线程自杀。

        释放线程池中的堆内存。

        销毁互斥锁和条件变量。

(2)说明:

        (a)关于pthread_join:当A线程调用线程B并 pthread_join() 时,A线程会处于阻塞状态,直到B线程结束后,A线程才会继续执行下去。当 pthread_join() 函数返回后,被调用线程才算真正意义上的结束,它的内存空间也会被释放(如果被调用线程是非分离的)。

        (b)管理者线程采用pthread_join;消费者线程采用 pthread_exit。之所以消费者线程自动退出的目的是,不让管理者线程承担过多的工作,管理者线程应该只是负责统一的管理,

int threadPoolDestroy(ThreadPool* pool)
{if (pool == NULL){return -1;}// 关闭线程池pool->shutdown = 1;// 阻塞回收管理者线程pthread_join(pool->managerID, NULL);// 唤醒阻塞的消费者线程for (int i = 0; i < pool->liveNum; ++i){pthread_cond_signal(&pool->notEmpty);}// 释放堆内存if (pool->taskQ){free(pool->taskQ);}if (pool->threadIDs){free(pool->threadIDs);}pthread_mutex_destroy(&pool->mutexPool);pthread_mutex_destroy(&pool->mutexBusy);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);free(pool);pool = NULL;return 0;
}

4.线程池怎么用?

#include "threadpool.h"
#include "stdio.h"
#include "pthread.h"
#include "unistd.h"
#include "malloc.h"void taskFunc(void* arg)
{int num = *(int*)arg;printf("thread %ld is working, number = %d\n",pthread_self(), num);sleep(1);
}int main()
{// 创建线程池ThreadPool* pool = threadPoolCreate(3, 10, 100);for (int i = 0; i < 100; ++i){int* num = (int*)malloc(sizeof(int));*num = i + 100;threadPoolAdd(pool, taskFunc, num);}sleep(30);threadPoolDestroy(pool);return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/255365.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【运维面试100问】(八)如何手动释放内存

本站以分享各种运维经验和运维所需要的技能为主 《python零基础入门》&#xff1a;python零基础入门学习 《python运维脚本》&#xff1a; python运维脚本实践 《shell》&#xff1a;shell学习 《terraform》持续更新中&#xff1a;terraform_Aws学习零基础入门到最佳实战 《k8…

看了无数文章,今天终于理解这些性能指标了!

如果要问性能测试里那种技术最难&#xff0c;相信很多人都会说出“性能分析”这四个字。确实是&#xff0c;性能测试的执行是比较简单的&#xff0c;难的是执行完成后&#xff0c;如何分析这些数据。如何从一大堆数据里分析哪些数据是优秀的&#xff0c;哪些数据是有问题的。这…

浅谈https

1.网络传输的安全性 http 协议&#xff1a;不安全&#xff0c;未加密https 协议&#xff1a;安全&#xff0c;对请求报文和响应报文做加密 2.对称加密与非对称加密 2.1 对称加密 特点&#xff1a; 加解密使用 相同 秘钥 高效&#xff0c;适用于大量数据的加密场景 算法公开&a…

拼多多选品大作战:利用类目榜单找到潜力爆品

想要在激烈的电商竞争中脱颖而出&#xff0c;选品是至关重要的一环。 而拼多多提供的类目榜单数据&#xff0c;为商家们提供了一个寻找热门产品和趋势的利器。本文将详细介绍如何利用拼多多类目榜单进行选品&#xff0c;并帮助您找到畅销产品。 拼多多新手选品核心两要素&…

如何让软文更具画面感,媒介盒子分享

写软文这种带有销售性质的文案时&#xff0c;总说要有画面感&#xff0c;要有想象空间。只有针对目标用户的感受的设计&#xff0c;要了解用户想的是什么&#xff0c;要用可视化的描述来影响用户的感受&#xff0c;今天媒介盒子就和大家分享&#xff1a;如何让软文更具画面感。…

【Python】Python读Excel文件生成xml文件

目录 ​前言 正文 1.Python基础学习 2.Python读取Excel表格 2.1安装xlrd模块 2.2使用介绍 2.2.1常用单元格中的数据类型 2.2.2 导入模块 2.2.3打开Excel文件读取数据 2.2.4常用函数 2.2.5代码测试 2.2.6 Python操作Excel官方网址 3.Python创建xml文件 3.1 xml语法…

HarmonyOS4.0系列——03、声明式UI、链式编程、事件方法、以及自定义组件简单案例

HarmonyOS4.0系列——03、声明式UI、链式编程、事件方法、以及自定义组件简单案例 声明式 UI ArkTS以声明方式组合和扩展组件来描述应用程序的UI&#xff0c;同时还提供了基本的属性、事件和子组件配置方法&#xff0c;帮助开发者实现应用交互逻辑。 如果组件的接口定义没有包…

科技云报道:AI+PaaS,中国云计算市场迎来新“变量”?

科技云报道原创。 没有小的市场&#xff0c;只有还没有被发现的大生意。 随着企业数字化转型的逐级深入&#xff0c;市场需求进一步向PaaS和SaaS层进发&#xff0c;使之成为公有云服务市场增长的主要动力。 根据IDC最新发布的报告显示&#xff0c;2022-2027五年间中国公有云…

HarmonyOS 开发 Java 与 ArkTS 如何抉择?

本文详细分析 Java 与 ArkTS 在 HarmonyOS 开发过程的区别&#xff0c;力求解答学员的一些困惑。 何为 HarmonyOS&#xff1f; 在讨论语言的差异时&#xff0c;我们先了解下什么是 HarmonyOS。华为官方是这么解释 HarmonyOS 的&#xff1a; “鸿蒙操作系统”特指华为智能终端…

【Maven】未找到有效的 Maven 安装。在配置对话框中设置主目录,或者在系统上设置 M2_HOME 环境变量。

错误显示 今天导入工程&#xff0c;进行clean的时候报错&#xff1a; 解决方法 重新设置一下maven的目录即可

Leo赠书活动-12期 【Java程序员,你掌握了多线程吗?】文末送书

Leo赠书活动-12期 【Java程序员&#xff0c;你掌握了多线程吗&#xff1f;】文末送书 ✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1…

FreeRTOS系统延时函数分析

一、概述 FreeRTOS提供了两个系统延时函数&#xff0c;相对延时函数vTaskDelay()和绝对延时函数vTaskDelayUntil()。相对延时是指每次延时都是从任务执行函数vTaskDelay()开始&#xff0c;延时指定的时间结束&#xff0c;绝对延时是指每隔指定的时间&#xff0c;执行一次调用vT…