Linux系统编程系列之线程池

 Linux系统编程系列(16篇管饱,吃货都投降了!)

        1、Linux系统编程系列之进程基础

        2、Linux系统编程系列之进程间通信(IPC)-信号

        3、Linux系统编程系列之进程间通信(IPC)-管道

        4、Linux系统编程系列之进程间通信-IPC对象

        5、Linux系统编程系列之进程间通信-消息队列

        6、Linux系统编程系列之进程间通信-共享内存

        7、Linux系统编程系列之进程间通信-信号量组

        8、Linux系统编程系列之守护进程

        9、Linux系统编程系列之线程

        10、Linux系统编程系列之线程属性 

        11、Linux系统编程系列之互斥锁和读写锁

        12、Linux系统编程系列之线程的信号处理

        13、Linux系统编程系列之POSIX信号量

        14、Linux系统编程系列之条件变量

        15、Linux系统编程系列之死锁

        16、 Linux系统编程系列之线程池

一、什么是线程池

        线程池就是将许多线程,放置在一个池子中(实际就是一个结构体)只要有任务,就将任务投入池中,这些线程们通过某些机制,及时处理这些任务,一旦处理完后又重新回到池子中重新接收新任务。为了便于管理,线程池还应当提供诸如初始化线程池,增删线程数量,检测未完成任务的数据,检测正在执行任务的线程的数量、销毁线程池等等基本操作。

二、特性

        1、更好的满足并发性需求

        2、更加节省系统资源,降低负载

        3、避免每次需要执行任务时都创建新的线程

        4、可以限制并发线程数量,帮助控制应用程序的性能和稳定性

        核心思想:通过精巧的设计使得池子中的线程数量可以动态地发生变化,让线程既可以应对并发性需求,又不会浪费系统资源

三、使用场景

        用来应对某种场景,要在程序中创建大量线程,并且这些线程的数量和生命周期均不确定,可能方生方死,也可能常驻内存,请看下面举例。

        1、处理大量密集型任务

        线程池可以处理多个任务,从而减少了创建和销毁线程的开销,提高了系统性能

        2、服务端应用程序

        线程池可以帮助服务端应用程序同时处理多个客户端请求

        3、I/O 密集型应用程序

        线程池可以处理 I/O 操作,允许系统使用空闲时间进行其他任务处理

        4、Web 应用程序

        线程池可以处理来自 Web 客户端的请求,从而提高了 Web 应用程序的性能

        总之,任何需要处理大量任务或需要同时处理多个请求的应用程序都可以使用线程池来提高性能和效率。

四、设计思想

        设计某个东西一定要有个目标,这里是要求设计一个线程池,首先要明白线程池的作用,然后根据其作用来展开设计。线程池是通过让线程的数量进行动态的变化来及时处理接受的任务,所以核心就是线程和任务。可以将问题转换为如何组织线程和组织任务?借鉴别人的一幅图

         1、如何组织线程

                从上图可以看出,线程被创建出来之后,都处于睡眠态,它们实际上是进入了条件变量的等待队列中,而任务都被放入一个链表,被互斥锁保护起来

                线程的生命周期如下:(这里省略程序执行的流程图)

                (1)、线程被创建

                (2)、预防死锁,准备好退出处理函数,防止在持有一把锁的状态中死去

                (3)、尝试持有互斥锁(等待任务)

                (4)、判断是否有任务,若无则进入条件变量等待队列睡眠,若有则进入第(5)步

                (5)、从任务链表中取得一个任务

                (6)、释放互斥锁

                (7)、弹出退出处理函数,避免占用内存

                (8)、执行任务

                (9)、执行任务完成后重新回到第(2)步

                线程的生命周期其实就是线程要做的事情,把上面的第(2)步到第(8)步写成一个线程的例程函数。

         2、如何组织任务

                所谓的任务就是一个函数,回想一下创建一条线程时,是不是要指定一个线程例程函数。这里的线程池做法是将函数(准确的说是函数指针)及其参数存入一个任务节点,并将节点链接成一个链表。所以现在的问题变成如何设计一个任务链表?

                一个链表的操作有:创建节点,增加节点,删除节点,查询节点这几步。由于这里是任务节点,就不需要查询了,当然也可以查询。所以现在的问题转换为任务节点如何设计?

                任务节点的数据域主要有函数指针和函数参数,指针域就是一个简单任务结点指针,也可以是双向循环链表。

                设计分析到这里,基本上有了框架了,前面买了这么多关子,下面就来个重头戏。其实前面主要是想描述一种设计思想,遇到新的东西如何设计它?首先要明白设计的目的和作用,然后找出核心点,最后就是把核心点进行不断的分析推理,一步步转换为最基本的问题。学过项目管理的同学应该听过在范围管理里有个叫工作分解结构的东西,这里跟那个差不多。

五、案例

        实现一个线程池,并完成线程池的基本操作演示

        thread_pool.h(线程池头文件)

#ifndef _THREAD_POOL_H
#define _THREAD_POOL_H#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>#define MAX_WAITING_TASKS	1000    // 最大的等待任务数量
#define MAX_ACTIVE_THREADS	20      // 最多的活跃线程数量// 任务节点
typedef struct task 
{void *(*do_task) (void *arg);   // 函数指针void *arg;      // 函数参数struct task *next;  // 指向下一个任务节点
}task;// 线程池的管理节点
typedef struct thread_pool
{pthread_mutex_t task_list_lock;   // 任务列表互斥锁,保证同一个时间只有一条线程进行访问pthread_cond_t  task_list_cond;   // 任务列表条件变量,保证没有任务时,线程进入睡眠task *task_list;    // 任务列表pthread_t *tids;     // 存放线程号的数组int shutdown;  // 线程池销毁的开关unsigned int max_waiting_tasks;  // 最大等待的任务数量unsigned int waiting_tasks;      // 正在等待被处理的任务数量unsigned int active_threads;     // 正在活跃的线程数}thread_pool;// 初始化线程池
int init_pool(thread_pool *pool, unsigned int threads_number);// 往线程池中添加任务
int add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg);// 往线程池中添加线程
int add_thread(thread_pool *pool, unsigned int additional_threads);// 从线程池中删除线程
int remove_thread(thread_pool *pool, unsigned int removing_threads);// 查询线程池中线程数量
int total_thread(thread_pool *pool);// 销毁线程池
int destroy_pool(thread_pool *pool);// 线程的例程函数
void *routine(void *arg);#endif // _THREAD_POOL_H

        thread_pool.c(线程池实现文件) 

#include "thread_pool.h"// 初始化线程池
int init_pool(thread_pool *pool, unsigned int threads_number)
{if(threads_number < 1){printf("warning: threads_number must bigger than 0\n");return -1;}if(threads_number > MAX_ACTIVE_THREADS){printf("warning: threads number is bigger than MAX_ACTIVE_THREADS(%u)\n", MAX_ACTIVE_THREADS);return -1;}pthread_mutex_init(&pool->task_list_lock, NULL); // 初始化互斥锁pthread_cond_init(&pool->task_list_cond, NULL);  // 初始化条件变量pool->task_list = calloc(1, sizeof(task));  // 申请一个任务头节点if(!pool->task_list){printf("error: task_list calloc fail\n");return -1;}pool->task_list->next = NULL;   // 让任务头节点指向NULLpool->tids = calloc(MAX_ACTIVE_THREADS, sizeof(pthread_t)); // 申请堆数组用来存放线程号if(!pool->tids){printf("error: tids calloc fail\n");return -1;}pool->shutdown = 0; // 关闭线程池销毁开发pool->waiting_tasks = 0;    // 当前等待任务为0pool->max_waiting_tasks = MAX_WAITING_TASKS;    // 初始化最大等待任务数量pool->active_threads = threads_number;  // 初始化活跃的线程数// 根据活跃的线程数,创建对应数量的线程for(int i = 0; i < pool->active_threads; i++){// 需要判断线程是否创建失败// 线程号用线程号数组,线程属性设置为默认的,例程函数是routine,函数参数是线程池errno = pthread_create(&pool->tids[i], NULL, routine,(void*)pool);if(errno != 0){perror("error: pthread_create fail");return -1;}}return 1;
}// 往线程池中添加任务
int add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
{if(!pool){printf("warning: thread_pool is null\n");return -1;}// 如果当前等待执行的任务数超过最大等待的任务数量,则退出if(pool->waiting_tasks >= pool->max_waiting_tasks){printf("warning: task_list is full, too many list\n");return -1;}// 尝试给新的任务节点申请空间task *new_task = (task*)malloc(sizeof(task));if(!new_task){printf("error: task malloc fail\n");return -1;}// 初始化任务节点new_task->do_task = do_task;new_task->arg = arg;new_task->next = NULL;// 把任务节点添加到任务列表最后面// 1、先找到任务列表末尾task *tmp = NULL;for(tmp = pool->task_list; tmp->next; tmp = tmp->next);// 2、上锁pthread_mutex_lock(&pool->task_list_lock);// 3、添加新任务到任务列表末尾,同时当前等待任务数+1tmp->next = new_task;pool->waiting_tasks++;// 4、解锁pthread_mutex_unlock(&pool->task_list_lock);// 唤醒一个正在条件变量中睡眠等待的线程,取执行任务pthread_cond_signal(&pool->task_list_cond);return 1;
}// 往线程池中添加线程,返回实际添加的线程数量
int add_thread(thread_pool *pool, unsigned int additional_threads)
{if(!pool){printf("warning: thread_pool is null\n");return -1;}if(additional_threads == 0){return 0;}// 期望活跃的线程数 = 目前活跃的线程数 + 期望添加的线程数unsigned int total_threads = pool->active_threads + additional_threads;// 如果超过最大的活跃线程数就打印警告if(total_threads > MAX_ACTIVE_THREADS){printf("warning: add too many threads\n");}int actual_add_threads = 0;for(int i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++){// 需要判断线程是否创建失败// 线程号用线程号数组,线程属性设置为默认的,例程函数是routine,函数参数是线程池errno = pthread_create(&pool->tids[i], NULL, routine,(void*)pool);if(errno != 0){perror("error: pthread_create fail");// 如果一个都没有创建成功,就直接返回if(actual_add_threads == 0){return -1;}// 如果成功创建了多个线程,但是本次创建失败,就跳出循环break;}else{actual_add_threads++;   // 线程创建成功就+1}}// 更新线程池中活跃的线程数pool->active_threads += actual_add_threads;return actual_add_threads;  // 返回实际添加的线程
}// 从线程池中删除线程,返回实际删除线程的数量
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{if(!pool){printf("warning: thread_pool is null\n");return -1;}if(removing_threads == 0){return pool->active_threads;}// 如果要删除的线程数大于线程池中活跃的线程数,则打印警告if(removing_threads >= pool->active_threads){printf("warning: remove too many threads\n");}// 剩余数量 = 活跃数量 - 删除的目标数  int remaining_threads = pool->active_threads - removing_threads;//  目的是为了让线程池中最少保留有一个线程可以用于执行任务remaining_threads = remaining_threads > 0 ? remaining_threads : 1;int actual_remove_threads = 0;;// 循环取消线程直到等于期望线程数for(int i = pool->active_threads-1; i > remaining_threads-1; i--){errno = pthread_cancel(pool->tids[i]);if(errno != 0){printf("[%ld] cancel error: %s\n", pool->tids[i], strerror(errno));break;}else{actual_remove_threads++;}}// 更新线程池中活跃的线程数量pool->active_threads -= actual_remove_threads;return actual_remove_threads;   // 返回实际删除的线程
}// 查询线程池中线程数量
int total_thread(thread_pool *pool)
{return pool->active_threads;
}// 销毁线程池
int destroy_pool(thread_pool *pool)
{if(!pool){printf("warning: thread_pool is null\n");return -1;}pool->shutdown = 1; // 启动线程池销毁开关pthread_cond_broadcast(&pool->task_list_cond);    // 唤醒所有在线程池中的线程// 循环等待所有线程退出for(int i = 0; i < pool->active_threads; i++){errno = pthread_join(pool->tids[i], NULL);if(errno != 0){printf("join tids[%d] error: %s\n", i, strerror(errno));}else{printf("[%ld] is joined\n", pool->tids[i]);}}// 头删法,从头一个个的删除任务节点for(task *p = pool->task_list->next; p; p = pool->task_list){pool->task_list->next = p->next;    // 修改首元节点free(p);}free(pool->task_list);free(pool->tids);free(pool);return 1;
}// 线程的取消函数
void pthread_cancel_handler(void *arg)
{printf("[%ld] is cancel\n", pthread_self());pthread_mutex_unlock((pthread_mutex_t*)arg);    // 解锁
}// 线程的例程函数
void *routine(void *arg)
{task *task_p;   // 任务指针,用来执行将要执行的任务thread_pool *pool = (thread_pool*)arg;while(1){pthread_cleanup_push(pthread_cancel_handler, (void*)&pool->task_list_lock);// 尝试持有互斥锁pthread_mutex_lock(&pool->task_list_lock);// 判断是否有任务,没有则进入睡眠// 1、没有任务,线程池销毁开关断开,则进入条件变量等待队列while(!pool->waiting_tasks && !pool->shutdown){// 当条件不满足时,会先自动解锁pool->lock,然后等待到条件满足后,会自动上锁pool->lockpthread_cond_wait(&pool->task_list_cond, &pool->task_list_lock);}// 2、没有任务,线程池销毁开发闭合,则先解锁,然后退出if(!pool->waiting_tasks && pool->shutdown){pthread_mutex_unlock(&pool->task_list_lock);  // 需要解锁pthread_exit(NULL);}// 3、有任务,线程池销毁开关断开,则取一个任务task_p = pool->task_list->next;pool->task_list->next = task_p->next; // 弹出第一任务节点 pool->waiting_tasks--;  // 当前等待的任务数量-1,pthread_mutex_unlock(&pool->task_list_lock);  // 解锁// 弹出压入的线程取消函数,运行到这里不执行,但是当线程在前面被意外取消或中断会执行pthread_cleanup_pop(0); // 为了防止死锁,执行任务期间不接受线程取消请求pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);(task_p->do_task)(task_p->arg); // 通过函数指针的方式执行任务// 执行完任务后,接受线程取消请求pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);free(task_p);   // 删除任务节点}pthread_exit(NULL);
}

        main.c(线程池操作演示文件) 

// 线程池的测试案例#include <stdio.h>
#include <unistd.h>
#include "thread_pool.h"void *mytask(void *arg)
{	int n = rand()%10;printf("[%ld][%s] ==> job will be done in %d sec...\n", pthread_self(), __FUNCTION__, n);sleep(n);printf("[%ld][%s] ==> job done!\n", pthread_self(), __FUNCTION__);
}void *func(void *arg )
{printf("this is a test by [%s]\n", (char*)arg);sleep(1);printf("test finish...\n");
}void *count_time(void *arg)
{int i = 0;while(1){sleep(1);printf("sec: %d\n", ++i);}
}int main(void)
{pthread_t a;pthread_create(&a, NULL, count_time, NULL);// 1、初始化线程池thread_pool *pool = malloc(sizeof(thread_pool));init_pool(pool, 2);// 2、投放任务printf("throwing 3 tasks...\n");add_task(pool, mytask, NULL);add_task(pool, mytask, NULL);add_task(pool, mytask, NULL);// 3、查看当前线程池中的线程数量printf("current thread number: %d\n", total_thread(pool));sleep(9);// 4、再次投放任务printf("throwing another 2 tasks...\n");add_task(pool, mytask, NULL);add_task(pool, mytask, NULL);add_task(pool, func, (void *)"Great Macro");// 5、添加2条线程printf("try add 2 threads, actual add %d\n", add_thread(pool, 2));printf("current thread number: %d\n", total_thread(pool));sleep(5);// 6、删除3条线程printf("try remove 3 threads, actual remove: %d\n", remove_thread(pool, 3));printf("current thread number: %d\n", total_thread(pool));sleep(5);// 7、 销毁线程池printf("destroy thread pool\n");destroy_pool(pool);return 0;
}

        注:编译时,把线程池文件和测试文件放在同一个工作目录下。

        实际使用时,只需要把thread_pool.h和thread_pool.c拷贝到自己的工程目录下,然后根据规则操作线程池。

六、总结

        线程池是许多线程的集合,它不是线程组。线程池适用于任何需要处理大量任务或需要同时处理多个请求的应用程序的场景,可以提供性能和效率。

        至此,Linux系统编程系列,16篇完结撒花,历时5天,这年中秋国庆没有假放!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/125342.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

redis-设置从节点

节点结构 节点配置文件 主节点 不变 6380节点 port 6380 slaveof 127.0.0.1 63796381节点 port 6381 slaveof 127.0.0.1 6380启动 指定配置文件的方式启动 D:\jiqun\redis\Redis-6380>redis-server.exe redis.windows.conf启动时&#xff0c;会触发同步数据命令 主节点…

论文阅读——Pyramid Grafting Network for One-Stage High Resolution Saliency Detection

目录 基本信息标题目前存在的问题改进网络结构CMGM模块解答为什么要用这两个编码器进行编码 另一个写的好的参考 基本信息 期刊CVPR年份2022论文地址https://arxiv.org/pdf/2204.05041.pdf代码地址https://github.com/iCVTEAM/PGNet 标题 金字塔嫁接网络的一级高分辨率显著性…

【LeetCode热题100】--74.搜索二维矩阵

74.搜索二维矩阵 按行搜索&#xff0c;使用二分查找 class Solution {public boolean searchMatrix(int[][] matrix, int target) {for(int[] row : matrix){int index search(row,target);if(index > 0){return true;}}return false;}public int search(int[] nums,int t…

Linux CentOS7 vim临时文件

在vim中&#xff0c;由于断网、停电、故意退出、不小心关闭终端等多种原因&#xff0c;正在编辑的文件没有保存&#xff0c;系统将会为文件保存一个交换文件&#xff0c;或称临时文件&#xff0c;或备份文件。 如果因某种原因产生了交换文件&#xff0c;每次打开文件时&#x…

【Java】接口 interface

目录 概述 示例代码&#xff1a; 接口成员访问特点 示例代码&#xff1a; 概述 什么是接口 接口就是一种公共的规范标准&#xff0c;只要符合规范标准&#xff0c;大家都可以调用。 Java 中的接口更多的体现在对行为的抽象&#xff01; 1. 接口 用关键字 interface 修饰 pub…

基于SpringBoot的ElasticSearch操作(超详细教程)

一、ElasticSearch 简介 1、简介 ElasticSearch 是一个基于 Lucene 的搜索服务器。它提供了一个分布式多员工能力的全文搜索引擎&#xff0c;基于 RESTful web 接口。Elasticsearch 是用 Java 语言开发的&#xff0c;并作为 Apache 许可条款下的开放源码发布&#xff0c;是一种…

ElasticSearch第四讲:ES详解:ElasticSearch和Kibana安装

ElasticSearch第四讲&#xff1a;ES详解&#xff1a;ElasticSearch和Kibana安装 本文是ElasticSearch第四讲&#xff1a;ElasticSearch和Kibana安装&#xff0c;主要介绍ElasticSearch和Kibana的安装。了解完ElasticSearch基础和Elastic Stack生态后&#xff0c;我们便可以开始…

Pikachu靶场——远程命令执行漏洞(RCE)

文章目录 1. RCE1.1 exec "ping"1.1.1 源代码分析1.1.2 漏洞防御 1.2 exec "eval"1.2.1 源代码分析1.2.2 漏洞防御 1.3 RCE 漏洞防御 1. RCE RCE(remote command/code execute)概述&#xff1a; RCE漏洞&#xff0c;可以让攻击者直接向后台服务器远程注入…

flex布局与几个实例(含源码)

本文简单的说明下flex布局 有源码实例&#xff0c;后续会持续添加 flex默认主轴是横轴 容器主要有6个属性 flex-direction 决定主轴的方向 flex-direction: row | row-reverse | column | column-reverse; flex-wrap 决定是否换行 flex-wrap: nowrap | wrap | wrap-revers…

华为云云耀云服务器L实例评测|Elasticsearch的springboot整合 Kibana进行全查询和模糊查询

前言 最近华为云云耀云服务器L实例上新&#xff0c;也搞了一台来玩&#xff0c;期间遇到各种问题&#xff0c;在解决问题的过程中学到不少和运维相关的知识。 在前几期的博客中&#xff0c;介绍了Elasticsearch的Docker版本的安装&#xff0c;Elasticsearch的可视化Kibana工具…

手搭手Mybatis-Plus多数据源异构数据迁移案例

环境介绍 技术栈 springbootmybatis-plusdruidbaomidoumysqloracledm 软件 版本 mysql 8 IDEA IntelliJ IDEA 2022.2.1 JDK 1.8 Spring Boot 2.7.13 mybatis 2.3.1 pom.xml所需依赖 <dependencies><dependency><groupId>org.springframework.…

6轮面试阿里Android开发offer,薪资却从21k降到17k,在逗我?

一小伙工作快3年了&#xff0c;拿到了阿里云Android开发岗位P6的offer&#xff0c;算HR面一起&#xff0c;加起来有6轮面试了&#xff0c;将近3个月的时间&#xff0c;1轮同级 1轮Android用人部门leader 1轮Android 组leader 1轮项目CTO 1轮HR 1轮HRBP。 一路上各种事件分…