线程池详解并使用Go语言实现 Pool

写在前面

在线程池中存在几个概念:核心线程数最大线程数任务队列

  • 核心线程数指的是线程池的基本大小;也就是指worker的数量
  • 最大线程数指的是,同一时刻线程池中线程的数量最大不能超过该值;实际上就是指task任务的数量。
  • 任务队列是当任务较多时,线程池中线程的数量已经达到了核心线程数,这时候就是用任务队列来存储我们提交的任务。相当于缓冲作用。

与其他池化技术不同的是,线程池是基于生产者-消费者模式来实现的,任务的提交方是生产者,线程池是消费者 。当我们需要执行某个任务时,只需要把任务扔到线程池中即可。

池化技术:这里的池化和卷积的池化不一样,这里的池化技术简单点来说,就是提前保存大量的资源,以备不时之需

线程池中执行任务的流程如下图如下。
在这里插入图片描述

那么使用线程池可以带来一系列好处:

  1. 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  2. 提高响应速度:任务到达时,无需等待线程创建即可立即执行
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  4. 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。

任务调度

首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。

  1. 如果 taskCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。

在这里插入图片描述

  1. 如果 taskCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。

在这里插入图片描述

  1. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据 拒绝策略 来处理该任务, 默认的处理方式是直接抛异常。

在这里插入图片描述

常见的拒绝策略有以下几种

  • AbortPolicy 中止策略:丢弃任务并抛出异常
  • DiscardPolicy 丢弃策略:丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,且是静默丢弃。
  • DiscardOldestPolicy 弃老策略:丢弃队列最前面的任务,然后重新提交被拒绝的任务。

简单实现

定义任务Task 并 定义NewTask来新建Task对象

type Task struct {f func() error
}func NewTask(f func() error) *Task {return &Task{f: f}
}

定义 WorkPool 线程池

type WorkPool struct {TaskQueue chan *Task // Task队列workNum   int        // 协程池中最大的worker数量shop      chan struct{} // 停止工作标识
}

创建 WorkPool 的函数

func NewWorkPool(cap int) *WorkPool {if cap <= 0 {cap = 10}return &WorkPool{TaskQueue: make(chan *Task),workNum:   cap,shop:      make(chan struct{}),}
}

具体的协程池中的工作节点

func (p *WorkPool) worker(workId int) {for task := range p.TaskQueue {err := task.Execute()if err != nil {fmt.Println(err)continue}fmt.Printf(" work id %d finished \n", workId) // 打印出具体是哪个节点进行工作}
}

协程池启动函数

func (p *WorkPool) run() {// 根据work num 去创建worker工作for i := 0; i < p.workNum; i++ {go p.worker(i)}<-p.shop
}

协程池关闭函数

func (p *WorkPool) close() {p.shop <- struct{}{}
}

测试一下,使用定时器,每2秒进行一次投放,并且投放超过5个之后开始停止。

func TestWorkPool(t *testing.T) {task := NewTask(func() error {fmt.Print(time.Now())return nil})taskCount := 0ticker := time.NewTicker(2 * time.Second)p := NewWorkPool(3)go func(c *time.Ticker) {for {p.TaskQueue <- task<-c.CtaskCount++if taskCount == 5 {p.close()break}}return}(ticker)p.run()
}

结果:

可以看到结果是每两秒进行一次打印,并且worker对象都不一样。

完整代码

package gorountine_poolimport ("fmt""testing""time"
)func TestWorkPool(t *testing.T) {task := NewTask(func() error {fmt.Print(time.Now())return nil})taskCount := 0ticker := time.NewTicker(2 * time.Second)p := NewWorkPool(3)go func(c *time.Ticker) {for {p.TaskQueue <- task<-c.CtaskCount++if taskCount == 5 {p.close()break}}return}(ticker)p.run()
}type Task struct {f func() error
}func NewTask(f func() error) *Task {return &Task{f: f}
}// Execute 执行业务方法
func (t *Task) Execute() error {return t.f()
}type WorkPool struct {TaskQueue chan *Task // task队列workNum   int        // 携程池中最大的worker数量shop      chan struct{} // 停止标识
}// 创建Pool的函数
func NewWorkPool(cap int) *WorkPool {if cap <= 0 {cap = 10}return &WorkPool{TaskQueue: make(chan *Task),workNum:   cap,shop:      make(chan struct{}),}
}func (p *WorkPool) worker(workId int) {// 具体的工作for task := range p.TaskQueue {err := task.Execute()if err != nil {fmt.Println(err)continue}fmt.Printf(" work id %d finished \n", workId)}
}// 携程池开始工作
func (p *WorkPool) run() {// 根据work num 去创建worker工作for i := 0; i < p.workNum; i++ {go p.worker(i)}<-p.shop
}func (p *WorkPool) close() {p.shop <- struct{}{}
}

参考链接

[1] https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
[2] https://blog.csdn.net/weixin_44688301/article/details/123292211
[3] https://www.bilibili.com/video/BV1Nf4y137na

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/596723.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java集合——Map、Set和List总结

文章目录 一、Collection二、Map、Set、List的不同三、List1、ArrayList2、LinkedList 四、Map1、HashMap2、LinkedHashMap3、TreeMap 五、Set 一、Collection Collection 的常用方法 public boolean add(E e)&#xff1a;把给定的对象添加到当前集合中 。public void clear(…

Golang | Leetcode Golang题解之第11题盛最多水的容器

题目&#xff1a; 题解&#xff1a; func maxArea(height []int) int {res : 0L : 0R : len(height) - 1for L < R {tmp : math.Min(float64(height[L]), float64(height[R]))res int(math.Max(float64(res), tmp * float64((R - L))))if height[L] < height[R] {L} el…

微信小程序的页面交互2

一、自定义属性 &#xff08;1&#xff09;定义&#xff1a; 微信小程序中的自定义属性实际上是由data-前缀加上一个自定义属性名组成。 &#xff08;2&#xff09;如何获取自定义属性的值&#xff1f; 用到target或currentTarget对象的dataset属性可以获取数据 &#xff…

【51单片机入门记录】RTC(实时时钟)-DS1302概述

目录 一、基于三线通信的RTC-DS1302 &#xff08;1&#xff09;简介 &#xff08;2&#xff09;特性 &#xff08;3&#xff09;引脚介绍 &#xff08;4&#xff09;控制字的格式 &#xff08;5.0&#xff09;日历时钟寄存器介绍 &#xff08;5.1&#xff09;日历时钟寄存…

特别记录:chm报错

妙网生成CHM&#xff0c;一直都很好。 今天突然出现 查遍网上&#xff0c;都是提示&#xff1a; * 需要保证该文件存放的路径里边不能有“#”&#xff0c;如果有“#”就会出现这种问题 之类的一堆相同的解决方案&#xff0c;都未来解决 结果&#xff0c;经过排查&#xff0…

Spring Cloud Hoxton.SR7 Supported Boot Version: 2.3.2.RELEASE

1、地址 Spring Cloudhttps://docs.spring.io/spring-cloud/docs/Hoxton.SR7/reference/html/ 2、 截图

ubuntu20.04.6安装sshd服务,并连接到远程服务器

文章目录 sshd 是 OpenSSH 服务器的守护进程OpenSSH下载在 Ubuntu 上&#xff0c;可以按照以下步骤来管理 sshd 服务 防火墙开启22端口使用Mobaxterm链接服务器 sshd 是 OpenSSH 服务器的守护进程 它负责提供远程登录和安全的 shell 服务。通过启动 sshd 服务&#xff0c;可以…

NzN的数据结构--栈的实现

在前面我们已经学习了哪些线性数据结构呢&#xff1f;大家一起来回顾一下&#xff1a;C语言学过的数组&#xff0c;数据结构中的线性表和顺序表和链表。那我们今天再来介绍数据结构里的两个线性结构--栈和队列。 目录 一、栈的概念及结构 二、用数组实现栈 1. 栈的初始化和…

计算机组成与体系结构

数据的表示 进制的转换 二进制转十进制 10100.01 1 ∗ 2 4 1 ∗ 2 2 1 ∗ 2 − 2 10100.011 *2^41*2^21*2^{-2} 10100.011∗241∗221∗2−2 八进制转十进制 753 7 ∗ 8 2 5 ∗ 8 1 3 ∗ 8 1 491 753 7 * 8^25*8^13*8^1 491 7537∗825∗813∗81491 $$ $$ 十六进制…

4.1.k8s的pod-创建,数据持久化,网络暴露,env环境变量

目录 一、Pod介绍 二、指令创建和管理Pod 三、资源清单创建pod 1.挂载hostPath存储卷 2.NFS存储卷 所有节点安装nfs k8s3编辑NFS配置文件 k8s1&#xff0c;k8s2节点开机挂载 编辑pod资源清单&#xff0c;挂载nfs 四、pod网络暴露 1.hostNetwork使用宿主机的网络 2.…

【OneAPI】贴纸生成API

OneAPI新接口发布&#xff1a;贴纸生成 生成一个10241024像素的贴纸。 API地址&#xff1a;POST https://oneapi.coderbox.cn/openapi/api/stickers 请求参数&#xff08;body&#xff09; 参数名类型必填含义说明prompt提示词是提示词示例&#xff1a;一只可爱的小狗 响应…

ZYNQ学习之Petalinux 设计流程实战

基本都是摘抄正点原子的文章&#xff1a;<领航者 ZYNQ 之嵌入式Linux 开发指南 V3.2.pdf&#xff0c;因初次学习&#xff0c;仅作学习摘录之用&#xff0c;有不懂之处后续会继续更新~ PetaLinux工具提供了在 Xilinx 处理系统上自定义、构建和部署嵌入式 Linux 解决方案所需的…