【池式组件】线程池的原理与实现

线程池的原理与实现

  • 线程池简介
    • 1.线程池
    • 1.线程池
    • 2.数量固定的原因
    • 3.线程数量如何确定
    • 4.为什么需要线程池
    • 5.线程池结构
  • 线程池的实现
    • 数据结构设计
      • 1.任务结构
      • 2.任务队列结构
      • 3.线程池结构
    • 接口设计
  • 线程池的应用
    • reactor
    • redis 中线程池
    • skynet 中线程池

线程池简介

1.线程池

1.线程池

一种管理维持固定线程数量的池式结构。

2.数量固定的原因

  • 避免频繁的创建和销毁线程,造成资源浪费
  • 随着线程的数量不断增多,并不能实现性能的提升,相反可能造成系统负担

3.线程数量如何确定

  • CPU密集型:proc(CPU核心数)
  • IO密集型(网络IO,磁盘IO):2 * proc(CPU核心数)

4.为什么需要线程池

某类任务执行特别耗时,造成阻塞,严重影响该线程执行其他任务
作用:

  • 复用线程资源;
  • 减少线程创建和销毁的开销;
  • 可异步处理生产者线程的任务;
  • 减少了多个任务(不是一个任务)的执行时间;

5.线程池结构

线程池属于一种生产消费模型。
线程池运行环境构成:
生产者线程:发布任务
队列:存储任务,调度线程池
消费者线程:取出任务,执行任务

线程池的实现

数据结构设计

1.任务结构

typedef struct task_s {void *next;handler_pt func;void *arg;
} task_t;

2.任务队列结构

typedef struct task_queue_s {void *head;void **tail; int block;spinlock_t lock;pthread_mutex_t mutex;pthread_cond_t cond;
} task_queue_t;

3.线程池结构

struct thrdpool_s {task_queue_t *task_queue;atomic_int quit;int thrd_count;pthread_t *threads;
};

接口设计


// 创建对象的时候,回滚式的
static task_queue_t *
__taskqueue_create() {task_queue_t *queue = (task_queue_t *)malloc(sizeof(*queue));if (!queue) return NULL;int ret;ret = pthread_mutex_init(&queue->mutex, NULL);if (ret == 0) {ret = pthread_cond_init(&queue->cond, NULL);if (ret == 0) {spinlock_init(&queue->lock);queue->head = NULL;queue->tail = &queue->head;queue->block = 1;return queue;}pthread_cond_destroy(&queue->cond);}pthread_mutex_destroy(&queue->mutex);return NULL;
}static void
__nonblock(task_queue_t *queue) {pthread_mutex_lock(&queue->mutex);queue->block = 0;pthread_mutex_unlock(&queue->mutex);pthread_cond_broadcast(&queue->cond);
}static inline void 
__add_task(task_queue_t *queue, void *task) {void **link = (void **)task; // malloc *link = NULL; // task->next = NULL;spinlock_lock(&queue->lock);*queue->tail = link;queue->tail = link;spinlock_unlock(&queue->lock);pthread_cond_signal(&queue->cond);
}static inline void * 
__pop_task(task_queue_t *queue) {spinlock_lock(&queue->lock);if (queue->head == NULL) {spinlock_unlock(&queue->lock);return NULL;}task_t *task;task = queue->head;queue->head = task->next;if (queue->head == NULL) {queue->tail = &queue->head;}spinlock_unlock(&queue->lock);return task;
}static inline void * 
__get_task(task_queue_t *queue) {task_t *task;// 虚假唤醒while ((task = __pop_task(queue)) == NULL) {pthread_mutex_lock(&queue->mutex);if (queue->block == 0) {// break;pthread_mutex_unlock(&queue->mutex);return NULL;}// 1. 先 unlock(&mtx);// 2. 在 cond 休眠// ----- 生产者产生任务  signal// 3. 在 cond 唤醒// 4. 加上  lock(&mtx);pthread_cond_wait(&queue->cond, &queue->mutex);pthread_mutex_unlock(&queue->mutex);}return task;
}static void
__taskqueue_destroy(task_queue_t *queue) {task_t *task;while ((task = __pop_task(queue))) { // 任务的生命周期由 thrdpool 管理free(task);}spinlock_destroy(&queue->lock);pthread_cond_destroy(&queue->cond);pthread_mutex_destroy(&queue->mutex);free(queue);
}static void *
__thrdpool_worker(void *arg) {thrdpool_t *pool = (thrdpool_t*) arg;task_t *task;void *ctx;while (atomic_load(&pool->quit) == 0) {task = (task_t*)__get_task(pool->task_queue);if (!task) break;handler_pt func = task->func;ctx = task->arg;free(task);func(ctx);}return NULL;
}static void 
__threads_terminate(thrdpool_t * pool) {atomic_store(&pool->quit, 1);__nonblock(pool->task_queue);int i;for (i=0; i<pool->thrd_count; i++) {pthread_join(pool->threads[i], NULL);}
}static int 
__threads_create(thrdpool_t *pool, size_t thrd_count) {pthread_attr_t attr;int ret;ret = pthread_attr_init(&attr);if (ret == 0) {pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thrd_count);if (pool->threads) {int i = 0;for (; i < thrd_count; i++) {if (pthread_create(&pool->threads[i], &attr, __thrdpool_worker, pool) != 0) {break;}}pool->thrd_count = i;pthread_attr_destroy(&attr);if (i == thrd_count)return 0;__threads_terminate(pool);free(pool->threads);}ret = -1;}return ret;
}void
thrdpool_terminate(thrdpool_t * pool) {atomic_store(&pool->quit, 1);__nonblock(pool->task_queue);
}thrdpool_t *
thrdpool_create(int thrd_count) {thrdpool_t *pool;pool = (thrdpool_t*) malloc(sizeof(*pool));if (!pool) return NULL;task_queue_t *queue = __taskqueue_create();if (queue) {pool->task_queue = queue;atomic_init(&pool->quit, 0);if (__threads_create(pool, thrd_count) == 0) {return pool;}__taskqueue_destroy(pool->task_queue);}free(pool);return NULL;
}int
thrdpool_post(thrdpool_t *pool, handler_pt func, void *arg) {if (atomic_load(&pool->quit) == 1) {return -1;}task_t *task = (task_t *)malloc(sizeof(task_t));if (!task) return -1;task->func = func;task->arg = arg;__add_task(pool->task_queue, task);return 0;
}void
thrdpool_waitdone(thrdpool_t *pool) {int i;for (i=0; i<pool->thrd_count; i++) {pthread_join(pool->threads[i], NULL);}__taskqueue_destroy(pool->task_queue);free(pool->threads);free(pool);
}

线程池的应用

reactor

在这里插入图片描述
在一个事件循环中,可以处理多个就绪事件,这些就绪事件在reactor 模型中时串行执行的,一个事件处理若耗时较长,会延迟其他同时触发的事件的处理(对于客户端而言,响应会变得较慢)。

redis 中线程池

在这里插入图片描述
作用:读写 io 处理以及数据包解析、压缩;

skynet 中线程池

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/259010.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

功能测试,接口测试,自动化测试,压力测试,性能测试,渗透测试,安全测试,具体是干嘛的?

软件测试是一个广义的概念&#xff0c;他包括了多领域的测试内容&#xff0c;比如&#xff0c;很多新手可能都听说&#xff1a;功能测试&#xff0c;接口测试&#xff0c;自动化测试&#xff0c;压力测试&#xff0c;性能测试&#xff0c;渗透测试&#xff0c;安全测试等&#…

3.添加与删除字段

添加字段与删除字段 1.添加字段 因为甲方的业务需求是不停变化的&#xff0c;所以在数据库操作中&#xff0c;添加字段可是常有的事。一个完整的字段包括&#xff1a;字段名、数据类型和完整性约束。 语法规则为&#xff1a; ALTER TABLE 表名 ADD 新字段名 数据类型 [约束条…

【SpringCache】快速入门 通俗易懂

1. 介绍 Spring Cache 是一个框架&#xff0c;实现了基于注解的缓存功能&#xff0c;只需要简单地加一个注解&#xff0c;就能实现缓存功能。 Spring Cache 提供了一层抽象&#xff0c;底层可以切换不同的缓存实现&#xff0c;例如&#xff1a; EHCache Caffeine Redis(常用…

Swagger页面报错Resolver error at definitions

问题描述 打开swagger页面报错Resolver error at definitions 原因分析&#xff1a; 从错误提示可以看出&#xff0c;是由map引起的原因&#xff0c;具体是因为swagger配置没有默认添加map的复杂结构引起的&#xff0c;需要手动添加。 解决方案&#xff1a; 找到swagger配置类…

智慧小区园区如何布局网络对讲系统

智慧小区园区如何布局网络对讲系统 随着小区住宅的不断更新发展&#xff0c;小区的管理人员也对小区内部的通讯也有了新的要求&#xff0c;要求在工作区域无盲区、语音通讯清晰&#xff0c;小区的安保后勤都能够随时在小区的地下室和室外工作区域、任何时间进行通信。提高小区…

SpringBoot系列之启动成功后执行业务的方法归纳

SpringBoot系列之启动成功后执行业务逻辑。在Springboot项目中经常会遇到需要在项目启动成功后&#xff0c;加一些业务逻辑的&#xff0c;比如缓存的预处理&#xff0c;配置参数的加载等等场景&#xff0c;下面给出一些常有的方法 实验环境 JDK 1.8SpringBoot 2.2.1Maven 3.2…

基于ssm家庭理财系统源码和论文

基于ssm家庭理财系统源码和论文743 idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 环境&#xff1a; jdk8 tomcat8.5 开发技术 ssm 摘要 随着Internet的发展&#xff0c;人们的日常生活已经离不开网络。未来人们的生活与工作将变得越来越数字化&#xff…

英文论文查重复率网址

大家好&#xff0c;今天来聊聊英文论文查重复率网址&#xff0c;希望能给大家提供一点参考。 以下是针对论文重复率高的情况&#xff0c;提供一些修改建议和技巧&#xff1a; 英文论文查重复率网址 在撰写英文论文时&#xff0c;查重是确保论文原创性和质量的重要环节快码论文…

解决:During handling of the above exception, another exception occurred

解决&#xff1a;During handling of the above exception, another exception occurred 文章目录 解决&#xff1a;During handling of the above exception, another exception occurred背景报错问题报错翻译报错位置代码报错原因解决方法参考内容&#xff1a;今天的分享就到…

【页面】表格展示

展示 Dom <template><div class"srch-result-container"><!--左侧--><div class"left"><div v-for"(item,index) in muneList" :key"index" :class"(muneIndexitem.mm)?active:"click"pa…

车载测试:如何用CANape进行ADAS实车功能测试?

前言 CANape是一款用于ECU测量、标定、诊断以及ADAS传感器数据采集的工具型软件。 测量——通过CANape不仅能采集记录ECU内部信号&#xff0c;还支持与车辆上的各种传感器的总线进行通信。与ECU不同&#xff0c;ADAS传感器不提供车辆实际运行信号&#xff0c;而是提供车辆运行…

数据结构与算法(六)分支限界法(Java)

目录 一、简介1.1 定义1.2 知识回顾1.3 两种解空间树1.4 三种分支限界法1.5 回溯法与分支线定法对比1.6 使用步骤 二、经典示例&#xff1a;0-1背包问题2.1 题目2.2 分析1&#xff09;暴力枚举2&#xff09;分支限界法 2.3 代码实现1&#xff09;实现广度优先策略遍历2&#xf…