线程池-手写线程池Linux C简单版本(生产者-消费者模型)

目录

  • 简介
  • 手写线程池
    • 线程池结构体分析
      • task_t
      • task_queue_t
      • thread_pool_t
    • 线程池函数分析
      • thread_pool_create
      • thread_pool_post
      • thread_worker
      • thread_pool_destroy
      • wait_all_done
      • thread_pool_free
    • 主函数调用
  • 运行结果

简介

本线程池采用C语言实现

线程池的场景:

当某些任务特别耗时(例如大量的IO读写操作),严重影响线程其他的任务的执行,可以使用线程池

线程池的一般特点:

线程池通常是一个生产者-消费者模型
生产者线程用于发布任务,任务通常保存在任务队列中
线程池作为消费者,用于取出任务,执行任务

线程池中线程数量的选择:

有一个经验公式: 线程数量 =(io等待时间+cpu运算时间)*核心数/cpu运算时间

因此可以根据经验公式得出下面两种场景的线程数量:

  • cpu密集任务:线程数量=核心数(即上面的公式假设cpu运算时间>>io等待时间)
  • io密集任务:线程数量=2*n+2

手写线程池

线程池代码结构:

  • thread_pool_create:创建线程池所需要的资源,包含不限于任务队列,子线程的创建。
  • thread_pool_post:用于任务的发布,将执行任务存在任务队列中。
  • thread_pool_destroy:用于线程池的退出,以及资源的销毁。
  • wait_all_done:join线程池所有子线程,等待回收子线程。
  • thread_worker:用于任务执行。

主要的核心点集中在thread_pool_post和thread_worker两个函数中,这两个函数也构成了生产者-消费者模型。本文采用队列+互斥锁+条件变量实现。

线程池结构体分析

由于C语言不像C++可以用类封装函数,因此线程池会使用结构体来封装一些变量或者函数指针。

task_t

封装任务的入口指针以及参数。

typedef struct task_t {handler_pt func;void * arg;
} task_t;

task_queue_t

封装任务队列,为了不频繁移动队列中数据,此处采用头尾索引来标记任务。

typedef struct task_queue_t {uint32_t head;uint32_t tail;uint32_t count;task_t *queue;
} task_queue_t;

thread_pool_t

包含互斥锁,条件变量,任务队列等信息

struct thread_pool_t {pthread_mutex_t mutex;pthread_cond_t condition; //条件变量pthread_t *threads; //线程task_queue_t task_queue; //任务队列int closed; //是否关闭线程池执行的标志,为1表示关闭int started; // 当前正在运行的线程数int thrd_count; //线程数int queue_size; //任务队列大小
};

其中closed:表示是否关闭线程池执行的标志,为1表示关闭。在线程的运行函数中,用来判断是否继续循环等待执行任务队列中的任务。
started:表示当前正在运行的线程数。在thread_pool_destroy函数中销毁线程池时,需要等待所有线程停止才行,即started == 0

线程池函数分析

thread_pool_create

创建线程池,初始化一些线程池属性
通过循环pthread_create函数创建子线程。

thread_pool_t *thread_pool_create(int thrd_count, int queue_size) {thread_pool_t *pool;if (thrd_count <= 0 || queue_size <= 0) {return NULL;}pool = (thread_pool_t*) malloc(sizeof(*pool));if (pool == NULL) {return NULL;}pool->thrd_count = 0;pool->queue_size = queue_size;pool->task_queue.head = 0;pool->task_queue.tail = 0;pool->task_queue.count = 0;pool->started = pool->closed = 0;pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);if (pool->task_queue.queue == NULL) {// TODO: free poolreturn NULL;}pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count);if (pool->threads == NULL) {// TODO: free poolreturn NULL;}int i = 0;for (; i < thrd_count; i++) {if (pthread_create(&(pool ->threads[i]), NULL, thread_worker, (void*)pool) != 0) {// TODO: free poolreturn NULL;}pool->thrd_count++;pool->started++;}return pool;
}

thread_pool_post

作为生产者,往任务队列里面添加任务
通过pthread_cond_signal通知子唤醒子线程的pthread_cond_wait

int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) {if (pool == NULL || func == NULL) {return -1;}task_queue_t *task_queue = &(pool->task_queue);
//此处用自旋锁会更节省消耗,因为锁里面的逻辑比较简单if (pthread_mutex_lock(&(pool->mutex)) != 0) {return -2;}if (pool->closed) {pthread_mutex_unlock(&(pool->mutex));return -3;}if (task_queue->count == pool->queue_size) {pthread_mutex_unlock(&(pool->mutex));return -4;}
//避免queue数据的变化,采用头尾索引来标识task_queue->queue[task_queue->tail].func = func;task_queue->queue[task_queue->tail].arg = arg;task_queue->tail = (task_queue->tail + 1) % pool->queue_size;task_queue->count++;
//唤醒一个休眠的线程if (pthread_cond_signal(&(pool->condition)) != 0) {pthread_mutex_unlock(&(pool->mutex));return -5;}pthread_mutex_unlock(&(pool->mutex));return 0;
}

thread_worker

pthread_cond_wait等待任务的唤醒
作为消费者, (*(task.func))(task.arg);执行任务

static void *thread_worker(void *thrd_pool) {thread_pool_t *pool = (thread_pool_t*)thrd_pool;task_queue_t *que;task_t task;for (;;) {pthread_mutex_lock(&(pool->mutex));que = &pool->task_queue;while (que->count == 0 && pool->closed == 0) {// 阻塞在 condition,等待任务队列添加任务pthread_cond_wait(&(pool->condition), &(pool->mutex));}if (pool->closed == 1 && que->count == 0) break;//没有任务,并且关闭标志打开,即跳出循环task = que->queue[que->head];que->head = (que->head + 1) % pool->queue_size;que->count--;pthread_mutex_unlock(&(pool->mutex));(*(task.func))(task.arg);//执行对应任务函数}pool->started--;//跳出循环之后,运行线程数需要减1pthread_mutex_unlock(&(pool->mutex));pthread_exit(NULL);return NULL;
}

thread_pool_destroy

销毁释放线程池,置 pool->closed = 1;
通过pthread_cond_broadcast唤醒线程池所有线程,这个和thread_pool_post里的pthread_cond_signal一样,并且broadcast会通知到所有的线程

int thread_pool_destroy(thread_pool_t *pool) {if (pool == NULL) {return -1;}if (pthread_mutex_lock(&(pool->mutex)) != 0) {return -2;}if (pool->closed) {thread_pool_free(pool);return -3;}pool->closed = 1;
//广播形式,通知所有阻塞在condition的线程接触阻塞if (pthread_cond_broadcast(&(pool->condition)) != 0 || pthread_mutex_unlock(&(pool->mutex)) != 0) {thread_pool_free(pool);return -4;}wait_all_done(pool);thread_pool_free(pool);return 0;
}

wait_all_done

将所有线程通过pthread_join回收,所有子线程任务执行完毕,回收线程

int wait_all_done(thread_pool_t *pool) {printf("wait_all_done start!pool->thrd_count:%d\n", pool->thrd_count);int i, ret=0;for (i=0; i < pool->thrd_count; i++) {printf("wait_all_done doing! i:%d\n", i);if (pthread_join(pool->threads[i], NULL) != 0) {ret=1;}}printf("wait_all_done end!\n");return ret;
}

thread_pool_free

释放线程池空间

static void thread_pool_free(thread_pool_t *pool) {if (pool == NULL || pool->started > 0) {return;}if (pool->threads) {free(pool->threads);pool->threads = NULL;pthread_mutex_lock(&(pool->mutex));pthread_mutex_destroy(&pool->mutex);pthread_cond_destroy(&pool->condition);}if (pool->task_queue.queue) {free(pool->task_queue.queue);pool->task_queue.queue = NULL;}free(pool);
}

主函数调用

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>#include "thrd_pool.h"int nums = 0;
int done = 0;
int task_num = 100;pthread_mutex_t lock;void do_task(void *arg) {usleep(10000);pthread_mutex_lock(&lock);done++;printf("doing %d task\n", done);pthread_mutex_unlock(&lock);
}int main(int argc, char **argv) {int threads = 8;int queue_size = 256;if (argc == 2) {threads = atoi(argv[1]);if (threads <= 0) {printf("threads number error: %d\n", threads);return 1;}} else if (argc > 2) {threads = atoi(argv[1]);queue_size = atoi(argv[1]);if (threads <= 0 || queue_size <= 0) {printf("threads number or queue size error: %d,%d\n", threads, queue_size);return 1;}}thread_pool_t *pool = thread_pool_create(threads, queue_size);if (pool == NULL) {printf("thread pool create error!\n");return 1;}while (thread_pool_post(pool, &do_task, NULL) == 0) {pthread_mutex_lock(&lock);nums++;pthread_mutex_unlock(&lock);if (nums > task_num) break;}printf("add %d tasks\n", nums);usleep(1000000);//延时等待所有的作业完成printf("did %d tasks\n", done);thread_pool_destroy(pool);return 0;
}

运行结果

使用指令编译文件:

gcc main.c thrd_pool.c -o main -lpthread

运行执行文件得到运行结果
在这里插入图片描述在这里插入图片描述

完整代码下载线程池Linux C语言简单版本

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/86560.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

kafka尚硅谷视频&#xff1a; 10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili ​ 1. producer初始化&#xff1a;加载默认配置&#xff0c;以及配置的参数&#xff0c;开启网络线程 2. 拦截器拦截 3. 序列化器进行消息key, value序列化 4. 进行分区 5. kafka broker集群 获取…

CSS中如何实现多列布局?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 多列布局&#xff08;Multi-column Layout&#xff09;⭐ column-count⭐ column-width⭐ column-gap⭐ column-rule⭐ column-span⭐ 示例⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧…

NoSQL数据库介绍+Redis部署

目录 一、NoSQL概述 1、数据的高并发读写 2、海量数据的高效率存储和访问 3、数据库的高扩展和高可用 二、NoSQL的类别 1、键值存储数据库 2、列存储数据库 3、文档型数据库 4、图形化数据库 三、分布式数据库中的CAP原理 1、传统的ACID 1&#xff09;、A--原子性 …

构建与应用大数据环境:从搭建到开发与组件使用的全面指南

文章目录 环境搭建开发与组件使用性能优化与监控安全与隐私总结 &#x1f388;个人主页&#xff1a;程序员 小侯 &#x1f390;CSDN新晋作者 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 ✨收录专栏&#xff1a;大数据系列 ✨文章内容&#xff1a; &#x1f91d;希望作者…

MeterSphere常用操作/脚本记录

设置变量 vars.put(“key”,“value”); //存为场景变量 设置环境变量 vars.put(${__metersphere_env_id}“key”,“value”); //存为环境变量 随机生成手机号 String phone “178123${__RandomString(5,0123456789)}”; //178123开头&#xff0c;后面5位随机 获取当前请求…

大数据(二)大数据行业相关统计数据

大数据&#xff08;二&#xff09;大数据行业相关统计数据 目录 一、大数据相关的各种资讯 二、转载自网络的大数据统计数据 2.1、国家大数据政策 2.2、产业结构分析 2.3、应用结构分析 2.4、数据中心 2.5、云计算 一、大数据相关的各种资讯 1. 据IDC预测&#xff0…

实战项目 在线学院springcloud调用篇3(nacos,feging,hystrix,gateway)

一 springcloud与springboot的关系 1.1 关系 1.2 版本关系 1.3 list转json串 public class Test {public static void main(String[] args) {List<String> dataListnew ArrayList<String>();dataList.add("12");dataList.add("45");dataLi…

Docker学习笔记

Docker学习笔记 docker的作用docker的基本组成安装docker阿里云镜像加速run的流程和docker原理 docker的思想来自于集装箱。 核心思想&#xff1a; 隔离 docker可以通过隔离机制将服务器利用到极致。 虚拟机&#xff1a;在windows中装一个Vmware&#xff0c;通过这个软件可以虚…

K8S最新版本集群部署(v1.28) + 容器引擎Docker部署(上)

温故知新 &#x1f4da;第一章 前言&#x1f4d7;背景&#x1f4d7;目的&#x1f4d7;总体方向 &#x1f4da;第二章 基本环境信息&#x1f4d7;机器信息&#x1f4d7;软件信息&#x1f4d7;部署用户kubernetes &#x1f4da;第三章 Kubernetes各组件部署&#x1f4d7;安装kube…

PHP 安装Composer,vue前端依赖包

电脑安装Composer 官网下载&#xff1a;https://getcomposer.org/Composer-Setup.exe 后端安装&#xff1a; 检查是否安装依赖&#xff1a; 安装Composer install 或 Composer i 前端安装&#xff1a; yarn install 安装依赖

AliOS-Things引入

目录 一、简介 1.1 硬件抽象层 1.2 AliOS-Things内核 rhino ​编辑 1.3 AliOS-Things组件 二、如何进行AliOS-Things开发 三、安装环境 安装python pip git 修改pip镜像源 安装aos-cube 一、简介 AliOS-Things是阿里巴巴公司推出的致力于搭建云端一体化LoT软件。AliOS-…

[uniapp] scroll-view 简单实现 u-tabbar效果

文章目录 方案踩坑1.scroll-view 横向失败2.点击item不滚动?3. scrollLeft从哪里来? 效果图 方案 官方scroll-view 进行封装 配合属性 scroll-left Number/String 设置横向滚动条位置 即可 scroll-into-view 属性尝试过,方案较难实现 踩坑 1.scroll-view 横向失败 安装…