Linux 下 C语言版本的线程池

目录

1. 线程池引入

2. 线程池介绍

3. 线程池的组成

4. 任务队列

5. 线程池定义

6. 头文件声明

7. 函数实现

8. 测试代码


1. 线程池引入

        我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

        那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

2. 线程池介绍

        线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件), 则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

3. 线程池的组成

        线程池的组成主要分为 3 个部分,这三部分配合工作就可以得到一个完整的线程池:

任务队列:存储需要处理的任务,由工作的线程来处理这些任务

        通过线程池提供的 API 函数,将一个待处理的任务添加到任务队列或者从任务队列中删除已处理的任务会被从任务队列中删除线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程

工作的线程(任务队列任务的消费者) ,N个:

        线程池中维护了一定数量的工作线程他们的作用是是不停的读任务队列从里边取出任务并处理工作的线程相当于是任务队列的消费者角色, 如果任务队列为,工作的线程将会被阻塞 (使用条件变量 / 信号量阻塞)如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作

管理者线程(不处理任务队列中的任务),1个

        它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测当任务过多的时候,可以适当的创建一些新的工作线程当任务过少的时候,可以适当的销毁一些工作的线程

4. 任务队列

// 任务结构体
typedef struct Task
{void(*function)(void* arg);//函数指针void* arg;// 函数参数
}Task;

5. 线程池定义

// 线程池结构体
struct ThreadPool
{//任务队列Task* taskQ; // 任务队列,数组,所以定义指针int queueCapacity;	//容量int queueSize;		//当前任务个数int queueFront;		//队头 -> 取数据int queueRear;		//队尾 -> 放数据pthread_t managerID;	// 管理者线程IDpthread_t *threadIDs;	// 工作的线程IDint minNum;				// 最少线程数int maxNum;				// 最大线程数int busyNum;			// 在忙中的线程数int liveNum;			// 存活线程数int exitNum;			// 需要杀死的线程数pthread_mutex_t mutexPool;	// 锁整个线程池pthread_mutex_t muteBusy;	// 锁busyNum变量pthread_cond_t	notFull;	// 任务队列是不是满了pthread_cond_t	notEmpty;	// 任务队列是不是空了int shutDown;		// 是不是需要销毁线程池,销毁为1,不销毁为0
};

6. 头文件声明

#ifndef __THREADPOOL_H
#define	__THREADPOOL_Htypedef struct ThreadPool ThreadPool; // 声明一下,说明该结构体在其他方定义了// 创建线程池并初始化
ThreadPool* threadPoolCreate(int min, int max, int queueSize);// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*),void* arg);// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);// 获取线程池中活着的线程个数
int threadPoolAliveNum(ThreadPool* pool);/
void* worker(void* arg);
void* manager(void* arg);
void threadExit(ThreadPool* pool);#endif // !__THREADPOOL_H

7. 函数实现

ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));//printf("threadPoolCreate start creating  \n");do{if (pool == NULL){printf("malloc  threadpool fail ...\n");break;}pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t)*max); // 根据最大的线程池设置进行创建空间if (pool->threadIDs == NULL){break;}memset(pool->threadIDs, 0, sizeof(pthread_t)*max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;pool->liveNum = min;pool->exitNum = 0;if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||pthread_mutex_init(&pool->muteBusy, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0){printf("mutex or cond init error\n");break;}// 任务队列pool->taskQ = (Task*)malloc(sizeof(Task)*queueSize);pool->queueCapacity = queueSize;pool->queueSize = 0;pool->queueFront = 0;pool->queueRear = 0;pool->shutDown = 0;// 创建线程pthread_create(&pool->managerID, NULL, manager, pool);// 管理线程for (int i = 0; i < min; ++i){//printf("creating is %d\n", i);pthread_create(&pool->threadIDs[i], NULL, worker, pool); // 工作线程}printf("threadPoolCreate create sucess  \n");return pool;} while (0);//是否资源if (pool&&pool->threadIDs) free(pool->threadIDs);if (pool&&pool->taskQ) free(pool->taskQ);if (pool) free(pool);return NULL;
}void* worker(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;//参数进行类型转换while (1){printf("worker thread %d\n",pthread_self());pthread_mutex_lock(&pool->mutexPool);// 判断当前任务队列是否为空,且线程没有关闭while (pool->queueSize == 0 && !pool->shutDown){printf("worker %d waiting \n", pthread_self());// 需要阻塞线程,锁和条件变量进行绑定,后面通过条件变量对其进行唤醒pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);//判断是不是需要销毁线程if (pool->exitNum>0){pool->exitNum--;if (pool->liveNum > pool->minNum){pool->liveNum--;pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}}}// 如果有任务,则继续向下运行,开始消费任务了// 判断线程池是否关闭了if (pool->shutDown){pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}// 从任务队列取出任务Task task;task.function = pool->taskQ[pool->queueFront].function; // 取出任务task.arg = pool->taskQ[pool->queueFront].arg;// 任务队列应该设计成环形的队列,只需要确定头部和尾部即可,添加任务进行覆盖即可// 移动头结点pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;//实现环形获取pool->queueSize--;// 解锁pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);// 当前子线程开始工作了,因此忙的状态需要标定printf("thread %ld start working...\n",pthread_self());pthread_mutex_lock(&pool->muteBusy);pool->busyNum++;pthread_mutex_unlock(&pool->muteBusy);// 开始工作task.function(task.arg);free(task.arg);task.arg = NULL;// 当前子线程完成工作,因此把忙的状态取消printf("thread %ld end working...\n", pthread_self());pthread_mutex_lock(&pool->muteBusy);pool->busyNum--;pthread_mutex_unlock(&pool->muteBusy);}return NULL;
}void* manager(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;//参数进行类型转换while (!pool->shutDown){// 每隔3s检测一次sleep(3);//取出线程池中任务的数量和当前线程的数量pthread_mutex_lock(&pool->mutexPool);int queueSize = pool->queueSize;int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);// 取出忙的线程的数量pthread_mutex_lock(&pool->muteBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->muteBusy);// 添加线程,需要制定一个添加的规则,可以根据实际情况进行设计即可// 任务的个数>存活的线程个数 && 存活的线程个数< 最大线程数if (queueSize > liveNum && liveNum < pool->maxNum){pthread_mutex_lock(&pool->mutexPool);int counter = 0;for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i){if (pool->threadIDs[i] == 0){pthread_create(&pool->threadIDs[i], NULL, worker, pool);counter++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}// 销毁线程// 忙的线程*2 < 存活的线程数 && 存活的线程数>最小线程数if (busyNum * 2 < liveNum && liveNum > pool->minNum){pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;pthread_mutex_unlock(&pool->mutexPool);// 让空闲的工作线程自杀for (int i = 0; i < NUMBER; i++){pthread_cond_signal(&pool->notEmpty);}}}return NULL;
}void threadExit(ThreadPool* pool)
{pthread_t tid = pthread_self();for (int i = 0; i < pool->maxNum; i++){if (pool->threadIDs[i] == tid){pool->threadIDs[i] = 0;printf("threadExit() called, %ld, exiting ...\n",tid);break;}}pthread_exit(NULL);
}void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{pthread_mutex_lock(&pool->mutexPool);while (pool->queueSize == pool->queueCapacity && !pool->shutDown){// 阻塞生产者线程printf("pthread_cond_wait(&pool->notFull, &pool->mutexPool \n");pthread_cond_wait(&pool->notFull, &pool->mutexPool);}// 判断线程池是否被关闭if (pool->shutDown){pthread_mutex_unlock(&pool->mutexPool);return;}// 添加任务pool->taskQ[pool->queueRear].function = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pool->queueSize++;pthread_cond_signal(&pool->notEmpty);pthread_mutex_unlock(&pool->mutexPool);//printf("threadPoolAdd sucess \n");
}int threadPoolBusyNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->muteBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->muteBusy);return busyNum;
}int threadPoolAliveNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexPool);int aliveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);return aliveNum;
}int threadPoolDestroy(ThreadPool* pool)
{if (pool == NULL){return -1;}//关闭线程池pool->shutDown = 1; // 阻塞回收管理者线程pthread_join(pool->managerID, NULL);// 唤醒阻塞的消费者线程for (int i = 0; i < pool->liveNum; i++){pthread_cond_signal(&pool->notEmpty);}// 释放堆内存if (pool->taskQ){free(pool->taskQ);}if (pool->threadIDs){free(pool->threadIDs);}pthread_mutex_destroy(&pool->muteBusy);pthread_mutex_destroy(&pool->mutexPool);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);free(pool);pool = NULL;}

8. 测试代码

#include <stdio.h>
#include"ThreadPool.h"
#include<pthread.h>
#include<unistd.h>
#include<stdlib.h>void taskFunc(void* arg)
{int num = *(int*)arg;printf("thread %ld is working, number = %d\n",pthread_self(), num);sleep(1);
}int main()
{printf("hello from ThreadPool!\n");//创建线程池ThreadPool* pool = threadPoolCreate(3, 10, 100);for (int i = 0; i < 100; i++){int* num = (int*)malloc(sizeof(int));*num = i + 100;//printf("*num = %d\n", *num);threadPoolAdd(pool, taskFunc, num);}sleep(30);threadPoolDestroy(pool);return 0;
}

这篇文章来源linux下c语言版线程池_linux c 线程池_zsffuture的博客-CSDN博客,我是根据视频讲解写的代码,测试通过,感觉代码结构设计很好,怕后面找不到,因此搬运过来参考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/107106.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

循环购模式:一种新型的电商模式,让你的生意更轻松

你是否想过&#xff0c;你的商品能够以一种更高效、更环保、更有趣的方式销售给顾客&#xff1f;你是否想过&#xff0c;你的顾客能够以一种更低成本、更高回报、更多互动的方式享受你的商品&#xff1f;如果你的答案是肯定的&#xff0c;那么你一定要了解一下循环购模式。 循环…

IDEA的快捷键大全

快捷键 说明 IntelliJ IDEA 的便捷操作性&#xff0c;快捷键的功劳占了一大半&#xff0c;对于各个快捷键组合请认真对待。IntelliJ IDEA 本身的设计思维是提倡键盘优先于鼠标的&#xff0c;所以各种快捷键组合层出不穷&#xff0c;对于快捷键设置也有各种支持&#xff0c;对…

floodfill算法(洪水灌溉算法)

一)floodfill算法简介: 二)图像渲染 733. 图像渲染 - 力扣&#xff08;LeetCode&#xff09; class Solution {int[] dx {1, 0, 0, -1};int[] dy {0, 1, -1, 0};//上下搜索的时候要使用向量数组int row0;int col0;int target0;public void dfs(int[][] image,int i,int j,int…

STC15/8单片机特有的PWM寄存器和普通定时器实现PWM输出

STC15/8单片机特有的PWM寄存器和普通定时器实现PWM输出 🌿主要针对STC15W4型号特有的6通道15位专门的高精度PWM。 🌿STC8系列 ✨STC15W4K32S4系列单片机具有6通道15位专门的高精度PWM(带死区控制)和2通道CCP(利用它的高速脉冲输出功能可实现11~16位PWM);(STC15F/L2K60S2系…

Mysql同步数据到Doris的踩坑过程

问题背景 由于项目需要&#xff0c;需要把多个Mysql数据库的数据同步到Doris数据库&#xff0c;然后利用Doris强调的计算和查询能力&#xff0c;来满足业务需求。有关Doris可以查看它的官方文档来了解它。 seatunnel的使用到放弃 缘起 从《第十届GIAC全球互联网架构大会》了…

口袋参谋:淘宝不限类目,透视竞品实时销量!快试试这个插件

​在运营一家店铺之前&#xff0c;可以先了解各类目宝贝的市场行情&#xff0c;及时掌握不同类目宝贝的价格、销售情况&#xff0c;根据需求制定出属于自己的营销策略。 【可跨类目竞店透视】功能&#xff1a; 支持一键获取任意店铺宝贝概况信息 【跨类目竞店透视】功能使用 …

C/C++之链表的建立

个人主页&#xff1a;点我进入主页 专栏分类&#xff1a;C语言初阶 C语言程序设计————KTV C语言小游戏 C语言进阶 C语言刷题 欢迎大家点赞&#xff0c;评论&#xff0c;收藏。 一起努力&#xff0c;一起奔赴大厂。 目录 1.头插 1.1简介 1.2代码实现头插 …

PyCharm克隆github上开源的项目

PyCharm怎么clone github上开源的项目 一、先要确保PyCharm正确的配置了Git 如果你已经在PyCharm中配置好了Git&#xff0c;可以跳过此步骤&#xff0c;直接看下一步。 那么怎么在PyCharm中配置Git呢&#xff1f; 百度搜索Git安装包&#xff0c;安装过程不再多说&#xff0…

数据结构与算法基础-学习-34-基数排序(桶排序)

目录 一、基本思想 二、算法思路 1、个位排序 &#xff08;1&#xff09;分配 &#xff08;2&#xff09;收集 2、十分位排序 &#xff08;1&#xff09;分配 &#xff08;2&#xff09;收集 三、源码分享 1、InitMyBucket 2、DestroyMyBucket 3、ClearMyBucket 4、…

JMeter接口测试

0.前言, 我决定我的代码必须要提升性能了,而不是写不写得出来的问题了, however,怎么知道我的代码是不是很烂和健壮性,需要专业的测试工具来测试出来,如高并发QPS是什么?,高频率是什么? 怎样自动化测试解放双手 1.jmeter是apache使用java的测试工具,模拟巨大的负载,官网地址…

C++ 特性模版

目录 1. 非类型模板参数 2. 模板的特化 2.1 概念 2.2 函数模板特化 2.3 类模板特化 2.3.1 全特化 2.3.2 偏特化 2.3.3 类模板特化应用示例 3 模板分离编译 3.1 什么是分离编译 3.2 模板的分离编译 3.3 解决方法 4. 模板总结 1. 非类型模板参数 模板参数分类类型形…

数据库数据恢复-Oracle数据库truncate的数据恢复案例

Oracle数据库故障&分析&#xff1a; 北京某单位Oracle 11g R2数据库误执行truncate table CM_CHECK_ITEM_HIS&#xff0c;表数据丢失&#xff0c;查询该表时报错。数据库备份无法使用&#xff0c;表数据无法查询。 Oracle数据库Truncate数据的机理&#xff1a;执行Trunca…