Linux下使用C语言实现线程池---代码及分析

线程池

相关文章
协议
Socket编程
高并发服务器实现
线程池

如果一个客户端建立连接使用创建一个线程用于处理这一个线程, 处理结束的时候把这一个线程删除, 这个时候会导致线程的创建以及销毁会消耗大量的时间

这时候可以一次性创建多个线程, 这几个线程统称线程池, 如果客户端建立一个连接, 线程池分配一个线程处理客户发过来的数据, 不处理的时候这几个线程阻塞

可以使用条件变量进行阻塞

线程的数量可以随着连接的个数, 时间等条件进行变换, 但是要有一个上限, 连接分个数增加的时候加线程, 如果忙的线程数量比较少的时候释放线程, 这一些处理使用一个adjust线程进行处理

img

实际实现

主函数

  • 创建一个线程池
  • 在里面添加任务
  • 等待
  • 关闭线程池

线程池

  • 初始化各种数据

  • 获取基础线程, 让他们阻塞等待任务

  • 获取任务, 唤醒线程处理

  • 处理结束接着阻塞

    后台的控制线程(adjust_thread)根据线程的实际使用情况添加或者释放线程

/*************************************************************************> File Name: thread_pool.c> Author: XvSenfeng> Mail: 1458612070@qq.com > Created Time: Wed 10 Apr 2024 08:23:27 PM CST************************************************************************/#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>

使用的头文件


//多长时间判断一下是否需要增加获取减少线程个数
//这一个值小一点看现象
#define DEFAULT_TIME	3
//最小的执行的线程的个数
#define MIN_WAIT_TASK_NUM	10
//增加减少的时候步长
#define DEFAULT_THREAD_VARY	10
#define true 1
#define false 0

使用的宏定义

//记录一个回调函数, 由于线程执行
typedef struct {void *(*function)(void *);void *arg;
} threadpool_task_t;//线程池的控制结构体
typedef struct{pthread_mutex_t lock;				//这一个结构体使用pthread_mutex_t thread_counter;		//记录忙状态的线程的个数pthread_cond_t queue_not_full;		//队列,没有满的时候添加使用pthread_cond_t queue_not_empty;pthread_t *threads;					//线程数组, 记录现在使用的线程的pidpthread_t adjust_tid;				//管理者的idthreadpool_task_t *task_queue;		//线程执行任务的时候使用的环形队列int min_thr_num;			//最小的时候保留的线程个数int max_thr_num;			//最大可以支持的个数int live_thr_num;			//存活的线程的个数int busy_thr_num;			//执行任务的线程的个数int wait_exit_thr_num;		//需要释放的线程的个数//队列使用int queue_front;int queue_rear;int queue_size;int queue_max_size;int shutdown;			//记录这一个线程池有没有使用}threadpool_t; 

线程池的控制结构体, 以及任务信息记录的结构体

void *adjust_thread(void *threadpool);
void threadpool_free(threadpool_t *pool);
//普通的线程处理函数
void *threadpool_thread(void *threadpool){threadpool_t *pool = (threadpool_t *)threadpool;threadpool_task_t task;while(true){pthread_mutex_lock(&(pool->lock));//判断一下现在没有可以执行的任务, 并且这一个线程组还没有销毁while((pool->queue_size == 0) && (!pool->shutdown)){printf("thread %#x is waiting\n", (unsigned int)pthread_self());//等待任务, 获取清除唤醒pthread_cond_wait(&pool->queue_not_empty, &pool->lock);//需要清除一部分线程if(pool->wait_exit_thr_num > 0){pool->wait_exit_thr_num--;//记录需要清除的任务的个数if(pool->live_thr_num > pool->min_thr_num){printf("thread %#x is exiting\n", (unsigned int)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&pool->lock);pthread_exit(NULL);}}}//这时候是有待处理的任务, 或者这一个线程池被销毁了if(pool->shutdown){//销毁线程池pthread_mutex_unlock(&pool->lock);printf("thread %#x will exit\n", (unsigned int)pthread_self());pthread_detach(pthread_self());pthread_exit(NULL);}//处理一个待处理的任务//获取一下这一个任务的处理函数task.function = pool->task_queue[pool->queue_front].function;task.arg = pool->task_queue[pool->queue_front].arg;//调整一下环形缓冲区的指针pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;pool->queue_size--;//发送一个任务队列现在有位置的信号, 这一个的阻塞在添加一个待处理任务的位置pthread_cond_broadcast(&pool->queue_not_full);pthread_mutex_unlock(&pool->lock);printf("thread %#x start working\n", (unsigned int)pthread_self());pthread_mutex_lock(&pool->thread_counter);pool->busy_thr_num++;pthread_mutex_unlock(&pool->thread_counter);(*(task.function))(task.arg);//调用则一个任务的处理函数printf("thread %#x end working\n", (unsigned int)pthread_self());pthread_mutex_lock(&pool->thread_counter);pool->busy_thr_num--;pthread_mutex_unlock(&pool->thread_counter);}pthread_exit(NULL);}
//创建一个线程池, 返回这一个线程池的操控结构体
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size){int i;threadpool_t *pool = NULL;do{printf("malloc thread pool\n");if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL){printf("malloc threadpool fail");break;}//记录一下初始信息pool->min_thr_num = min_thr_num;pool->max_thr_num = max_thr_num;pool->busy_thr_num = 0;pool->live_thr_num = min_thr_num;pool->wait_exit_thr_num = 0;pool->queue_size = 0;pool->queue_max_size = queue_max_size;pool->queue_front = 0;pool->queue_rear = 0;pool->shutdown = false;//获取线程数组pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);if(pool->threads == NULL){printf("malloc threads fail");break;}memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);//获取任务队列printf("malloc task queue\n");pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);printf("success\n");if(pool->task_queue == NULL){printf("malloc queue fail");break;}memset(pool->task_queue, 0, sizeof(threadpool_task_t)*queue_max_size);//初始化一下各种锁if(pthread_mutex_init(&(pool->thread_counter), NULL)!= 0	||pthread_mutex_init(&(pool->lock), NULL)	!= 0			||pthread_cond_init(&(pool->queue_not_empty), NULL) != 0	||pthread_cond_init(&(pool->queue_not_full), NULL) != 0){printf("init lock fail");break;}//开启一定数量的线程for(i = 0;i<min_thr_num ;i++){pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);printf("start thread %#x...\n", (unsigned int)pool->threads[i]);}//开启一个控制线程pthread_create(&pool->adjust_tid, NULL, adjust_thread, (void *)pool);return pool;}while(0);threadpool_free(pool);return NULL;
}
//释放这一个线程池的空间
void threadpool_free(threadpool_t *pool){if(pool == NULL){return;}if(pool->task_queue){free(pool->task_queue);}if(pool->threads){free(pool->threads);pthread_mutex_lock(&(pool->lock));pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));free(pool);}pool = NULL;
}
//销毁这一个线程池
void threadpool_destory(threadpool_t *pool){if(pool == NULL){return;}pool->shutdown = true;//等待服务线程的结束pthread_join(pool->adjust_tid, NULL);int i;for(i = 0; i < pool->live_thr_num; i++){pthread_cond_broadcast(&(pool->queue_not_empty));}for(i = 0; i < pool->live_thr_num; i++){pthread_join(pool->threads[i], NULL);}threadpool_free(pool);
}
//判断一个线程是不是还活着
int is_thread_alive(pthread_t tid){int kill_rc = pthread_kill(tid, 0);if(kill_rc == ESRCH){return false;}return true;
}
//控制线程
void *adjust_thread(void *threadpool){int i;threadpool_t *pool = (threadpool_t *)threadpool;//只要这一个线程池还在, 这一个线程就在while(!pool->shutdown){sleep(DEFAULT_TIME);pthread_mutex_lock(&(pool->lock));int queue_size = pool->queue_size;int live_thr_num = pool->live_thr_num;pthread_mutex_unlock(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));int busy_thr_num = pool->busy_thr_num;		//获取忙线程的个数pthread_mutex_unlock(&(pool->thread_counter));//创建线程, 这时候等待队列中的任务数大于等待阈值,且存活的线程数小于最大线程数if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num){//需要创建的线程数pthread_mutex_lock(&(pool->lock));int add = 0;//获取可以使用的数组位置for(i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY && pool->live_thr_num < pool->max_thr_num; i++){if(pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])){pthread_create(&pool->threads[i], NULL, threadpool_thread, (void *)pool);add++;pool->live_thr_num++;printf("new thread %#x is created\n", (unsigned int)pool->threads[i]);}}pthread_mutex_unlock(&(pool->lock));}//销毁线程,如果忙线程*2 < 存活线程数且存活线程数大于最小线程数if((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num){pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;//需要销毁的线程数pthread_mutex_unlock(&(pool->lock));//唤醒空闲线程,让其自杀for(i = 0; i < DEFAULT_THREAD_VARY; i++){pthread_cond_signal(&(pool->queue_not_empty));}}}
}
//给线程池里面加一个待处理的任务
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg){pthread_mutex_lock(&(pool->lock));//如果队列满了,调用wait阻塞while((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)){pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));}//线程池已经关闭if(pool->shutdown){pthread_cond_broadcast(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;}//清空工作线程的回调函数的参数if(pool->task_queue[pool->queue_rear].arg != NULL){pool->task_queue[pool->queue_rear].arg = NULL;}//添加任务到任务队列中pool->task_queue[pool->queue_rear].function = function;pool->task_queue[pool->queue_rear].arg = arg;pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;pool->queue_size++;//添加任务后,队列不为空,唤醒线程池中的线程pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return 0;
}
//模拟处理业务
void * process(void *arg){printf("thread %x working on task %d\n", (int)pthread_self(), *(int *)arg);sleep(1);printf("task %d is end\n", *(int *)arg);return NULL;
}

线程池的代码实现, 建议跟随main函数的流程分析

int main(void){threadpool_t *thp = threadpool_create(3, 100, 100);printf("pool inited\n");int num[20];//模拟添加任务for(int i = 0; i < 20 ; i++){num[i] = i;printf("add task %d\n", i);threadpool_add(thp, process, (void *)&num[i]); //添加任务}sleep(10);//等待处理结束threadpool_destory(thp);
}

测试这一个线程池

执行分析

2024-4-7-MPIO ./a.out  
start thread 0x2b192700...			#开启线程池的基础线程
thread 0x2b192700 is waiting
start thread 0x2a991700...
thread 0x2a991700 is waiting
start thread 0x2a190700...
thread 0x2a190700 is waiting
pool inited							#初始化结束
add task 0							#加入任务
add task 1
add task 2
thread 0x2b192700 start working		  #任务开始处理
thread 723068672 working on task 0
thread 0x2a991700 start working
thread 714675968 working on task 1
thread 0x2a190700 start working
add task 3
add task 4
add task 5
add task 6
add task 7
add task 8
add task 9
add task 10
add task 11
add task 12
add task 13
add task 14
add task 15
add task 16
add task 17
add task 18
add task 19
thread 706283264 working on task 2		#任务处理结束
task 0 is end
thread 0x2b192700 end working
thread 0x2b192700 start working
thread 723068672 working on task 3
task 1 is end
thread 0x2a991700 end working
thread 0x2a991700 start working
thread 714675968 working on task 4
task 2 is end
thread 0x2a190700 end working
thread 0x2a190700 start working
thread 706283264 working on task 5
task 3 is end
thread 0x2b192700 end working
thread 0x2b192700 start working
thread 723068672 working on task 6
task 4 is end
thread 0x2a991700 end working
thread 0x2a991700 start working
thread 714675968 working on task 7
task 5 is end
thread 0x2a190700 end working
thread 0x2a190700 start working
thread 706283264 working on task 8
new thread 0x2918e700 is created	#由于线程数量比较少, 开始加入线程
new thread 0x2898d700 is created
new thread 0x23fff700 is created
new thread 0x237fe700 is created
new thread 0x22ffd700 is created
new thread 0x227fc700 is created
new thread 0x21ffb700 is created
new thread 0x217fa700 is created
new thread 0x20ff9700 is created
new thread 0x207f8700 is created
thread 0x207f8700 start working
thread 0x2898d700 start working
thread 681105152 working on task 11
thread 0x23fff700 start working
thread 603977472 working on task 12
thread 545228544 working on task 9
thread 0x237fe700 start working
thread 595584768 working on task 13
thread 0x2918e700 start working
thread 689497856 working on task 10
thread 0x22ffd700 start working
thread 587192064 working on task 14
thread 0x227fc700 start working
thread 578799360 working on task 15
thread 0x21ffb700 start working
thread 570406656 working on task 16
thread 0x217fa700 start working
thread 562013952 working on task 17
thread 0x20ff9700 start working
thread 553621248 working on task 18
task 6 is end
task 8 is end
task 7 is end
thread 0x2a190700 end working
thread 0x2b192700 end working
thread 0x2b192700 is waiting
thread 0x2a991700 end working
thread 0x2a991700 is waiting
thread 0x2a190700 start working
thread 706283264 working on task 19
task 11 is end
thread 0x2898d700 end working
task 10 is end
thread 0x2918e700 end working
task 16 is end
thread 0x21ffb700 end working
task 12 is end
thread 0x23fff700 end working
task 15 is end
thread 0x227fc700 end working
task 18 is end
thread 0x20ff9700 end working
task 17 is end
thread 0x217fa700 end working
task 13 is end
thread 0x237fe700 end working
thread 0x2898d700 is waiting		#线程开始空闲
thread 0x2918e700 is waiting
task 9 is end
thread 0x207f8700 end working
task 19 is end
thread 0x2a190700 end working
task 14 is end
thread 0x22ffd700 end working
thread 0x21ffb700 is waiting
thread 0x23fff700 is waiting
thread 0x227fc700 is waiting
thread 0x20ff9700 is waiting
thread 0x217fa700 is waiting
thread 0x237fe700 is waiting
thread 0x207f8700 is waiting
thread 0x2a190700 is waiting
thread 0x22ffd700 is waiting
thread 0x2918e700 is exiting			#空闲任务不需要了
thread 0x2b192700 is exiting
thread 0x2a991700 is exiting
thread 0x2898d700 is exiting
thread 0x21ffb700 is exiting
thread 0x23fff700 is exiting
thread 0x227fc700 is exiting
thread 0x217fa700 is exiting
thread 0x237fe700 is exiting
thread 0x20ff9700 is exiting
thread 0x207f8700 will exit				#销毁线程池的时候还有三个线程
thread 0x2a190700 will exit
thread 0x22ffd700 will exit

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/619788.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spark实验三-spark进阶编程

1&#xff0e;Spark编程统计各地区租房人数 实验目标&#xff1a; (1) 掌握在IntelliJ IDEA 中操作spark程序开发 (2) 打包程序提交集群运行 实验说明&#xff1a; 现有一份某省份各地区租房信息文件 house.txt&#xff0c;文件中共有8个数据字段&#xff0c;字段说明…

更改ip地址的几种方式有哪些

在数字化时代&#xff0c;IP地址作为网络设备的标识&#xff0c;对于我们在网络世界中的活动至关重要。然而&#xff0c;出于多种原因&#xff0c;如保护隐私、访问特定网站或进行网络测试&#xff0c;我们可能需要更改IP地址。虎观代理将详细介绍IP地址的更改方法与步骤&#…

基于Java停车场管理系统设计与实现(源码+部署文档)

博主介绍&#xff1a; ✌至今服务客户已经1000、专注于Java技术领域、项目定制、技术答疑、开发工具、毕业项目实战 ✌ &#x1f345; 文末获取源码联系 &#x1f345; &#x1f447;&#x1f3fb; 精彩专栏 推荐订阅 &#x1f447;&#x1f3fb; 不然下次找不到 Java项目精品实…

欢乐钓鱼大师秒杀源码

gg修改器设置里面单选a内存然后去试试e类型搜索鱼竿的拉杆速度然后点修改点很多增加1然后游戏返回在进去看鱼竿拉速然后在修改器的里面找到拉速一样的数值其他恢复全移除不恢复移除会闪退然后点开保留下来的拉速数值点转到会有一堆数值你得找里面找到鱼竿的伤害距离等数值就可以…

[2024最新]MySQL-mysql 8.0.11安装教程

网上的教程有很多&#xff0c;基本上大同小异。但是安装软件有时就可能因为一个细节安装失败。我也是综合了很多个教程才安装好的&#xff0c;所以本教程可能也不是普遍适合的。 安装环境&#xff1a;win 10 1、下载zip安装包&#xff1a; MySQL8.0 For Windows zip包下载地…

vscode只修改几行,git却显示整个文件都被修改

原因&#xff1a;不同的操作系统默认的回车换行符是不一样的&#xff0c;有些编辑器会自动修改回车换行&#xff0c;然后就整个文件都变化了。 Unix/Linux/Mac使用的是LF&#xff0c;但Windows一直使用CRLF【回车(CR, ASCII 13, r) 换行(LF, ASCII 10, n)】作为换行符。 解决&a…

考研数学|跟「张宇」还是「武忠祥」?看这一篇就明白了

我想告诉你&#xff0c;选择张宇老师的课程也是一个很好的选择。而且24考研压中了很多类似的题目&#xff01; 推荐张宇老师的原因不仅是课程讲的比较好&#xff0c;更因为30讲这本书是比较系统的一本参考书。 张宇老师的数学课程一直以来都备受考生的推崇&#xff0c;他的讲…

mac基础操作、快捷、软件快捷方式

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 mac基础操作、快捷、软件快捷方式 前言mac快捷操作快捷查找切换页面页面缩略访达和命令端切换创建文件夹创建文件删除文件/文件夹获取文件的路径移动文件或文件夹复制文件命令端常用命令 前言 主要是方…

前端Vue3+uni+Ts

本次记录小兔仙仙的制作过程。 先看下我们的项目截图。主要是手机端&#xff0c;这里用了uniappVScode.三端适配的。可以打包成安卓和苹果。微信小程序。 首先&#xff1a;创建一个uni新的ts项目。 # 通过 git 从 gitee 克隆下载 登录 - Gitee.com git clone -b vite-ts http…

【面试八股总结】排序算法(一)

参考资料 &#xff1a;阿秀 一、冒泡排序 冒泡排序就是把小的元素往前交换或者把大的元素往后交换&#xff0c;比较相邻的两个元素&#xff0c;交换也发生在这两个元素之间。具体步骤&#xff1a; 比较相邻的元素。如果第一个比第二个大&#xff0c;就交换他们两个。对每一对…

每日OJ题_BFS解决最短路①_力扣1926. 迷宫中离入口最近的出口

目录 力扣1926. 迷宫中离入口最近的出口 解析代码 力扣1926. 迷宫中离入口最近的出口 1926. 迷宫中离入口最近的出口 难度 中等 给你一个 m x n 的迷宫矩阵 maze &#xff08;下标从 0 开始&#xff09;&#xff0c;矩阵中有空格子&#xff08;用 . 表示&#xff09;和墙&…

非线性SVM模型

5个条件判断一件事情是否发生&#xff0c;每个条件可能性只有2种&#xff08;发生或者不发生&#xff09;&#xff0c;计算每个条件对这件事情发生的影响力。非线性SVM模型的程序。 例一 为了计算每个条件对一件事情发生的影响力&#xff0c;并使用非线性支持向量机&#xff0…