Go协程池gopool源码解析

1、gopool简介

Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool

gopool is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines.

It is an alternative to the go keyword.

gopool的用法非常简单,将曾经我们经常使用的go func(){...}替换为gopool.Go(func(){...})即可

此时gopool将会使用默认的配置来管理你启动的协程,也可以选择针对业务场景配置池子大小以及扩容上限

old:

go func() {// do your job
}()

new:

gopool.Go(func(){// do your job
})

2、核心数据结构

1)、Pool

Pool是一个定义了协程池的接口,代码如下:

// util/gopool/pool.go
type Pool interface {// Name returns the corresponding pool name.// 协程池的名称Name() string// SetCap sets the goroutine capacity of the pool.// 设置协程池内goroutine的容量SetCap(cap int32)// Go executes f.// 执行f函数Go(f func())// CtxGo executes f and accepts the context.// 带ctx,执行f函数CtxGo(ctx context.Context, f func())// SetPanicHandler sets the panic handler.// 设置发生panic时调用的函数SetPanicHandler(f func(context.Context, interface{}))
}

gopool提供了Pool这个接口的默认实现pool,代码如下:

// util/gopool/pool.go
type pool struct {// The name of the pool// 协程池的名字name string// capacity of the pool, the maximum number of goroutines that are actually working// 协程池实际工作的goroutine的最大数量cap int32// Configuration information// 配置信息config *Config// linked list of tasks// task队列的元信息,每一个task代表一个待执行的函数taskHead  *tasktaskTail  *tasktaskLock  sync.MutextaskCount int32// Record the number of running workers// 当前有多少个worker在运行中,每个worker代表一个goroutineworkerCount int32// This method will be called when the worker panic// 协程池中的协程引发的panic会由该函数处理panicHandler func(context.Context, interface{})
}

pool数据结构如下图:

2)、task
// util/gopool/pool.go
type task struct {// 当前task的ctxctx context.Context// 当前task需要执行的函数ff func()// 指向下一个task的指针next *task
}

task是一个链表结构,可以把它理解为一个待执行的任务,包含了当前task需要执行的函数f func()以及指向下一个task的指针

一个协程池pool对应了一组task,pool维护了指向链表的头尾的两个指针:taskHead和taskTail以及链表的长度taskCount和对应的锁taskLock

3)、worker
// util/gopool/worker.go
type worker struct {pool *pool
}

一个worker就是逻辑上的一个执行器,它对应到一个协程池pool

当一个worker被唤起,将会开启一个goroutine,不断从pool中的task链表获取任务并执行,代码如下:

// util/gopool/worker.go
func (w *worker) run() {go func() {for {var t *task// 操作pool中的task链表前,加锁保证并发安全w.pool.taskLock.Lock()if w.pool.taskHead != nil {// 拿到taskHead准备执行t = w.pool.taskHead// 更新链表的head以及数量w.pool.taskHead = w.pool.taskHead.nextatomic.AddInt32(&w.pool.taskCount, -1)}if t == nil {// if there's no task to do, exit// 如果前一步拿到的taskHead为空,说明无任务需要执行,清理后返回(关闭goroutine)w.close()w.pool.taskLock.Unlock()w.Recycle()return}w.pool.taskLock.Unlock()// 执行任务,针对panic会recover,并调用配置的handlerfunc() {defer func() {if r := recover(); r != nil {msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())logger.CtxErrorf(t.ctx, msg)if w.pool.panicHandler != nil {w.pool.panicHandler(t.ctx, r)}}}()t.f()}()t.Recycle()}}()
}

3、核心API

来看下使用gopool的核心APIGo(f func()),实现如下:

// util/gopool/gopool.go
func Go(f func()) {CtxGo(context.Background(), f)
}func CtxGo(ctx context.Context, f func()) {defaultPool.CtxGo(ctx, f)
}
// util/gopool/pool.go
func (p *pool) CtxGo(ctx context.Context, f func()) {// 创建一个task对象,将ctx和待执行的函数赋值t := taskPool.Get().(*task)t.ctx = ctxt.f = f// 将task插入pool的链表的尾部,更新链表数量p.taskLock.Lock()if p.taskHead == nil {p.taskHead = tp.taskTail = t} else {p.taskTail.next = tp.taskTail = t}p.taskLock.Unlock()atomic.AddInt32(&p.taskCount, 1)// The following two conditions are met:// 1. the number of tasks is greater than the threshold.// 2. The current number of workers is less than the upper limit p.cap.// or there are currently no workers.// 以下任意条件满足时,创建新的worker并唤起执行// 1.待执行的task超过了扩容阈值(默认值为1)且当前运行的worker数量小于上限(默认值为10000)// 2.无worker运行if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {// worker数量+1p.incWorkerCount()// 创建一个新的worker,并把当前pool赋值w := workerPool.Get().(*worker)w.pool = p// 唤起worker执行w.run()}
}

以下任意条件满足时,会扩容worker:

  1. 待执行的task超过了扩容阈值(默认值为1)且当前运行的worker数量小于上限(默认值为10000)
  2. 无worker运行

gopool自行维护一个defaultPool,这是一个默认的pool结构体,在引入包的时候就进行初始化。当我们直接调用gopool.Go()时,本质上是调用了defaultPool的同名方法

// util/gopool/gopool.go
var defaultPool Poolfunc init() {defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}
// util/gopool/config.go
const (defaultScalaThreshold = 1
)type Config struct {// threshold for scale.// new goroutine is created if len(task chan) > ScaleThreshold.// defaults to defaultScalaThreshold.// 控制扩容的阈值,一旦待执行的task超过此值,且worker数量未达到上限,就开始启动新的workerScaleThreshold int32
}func NewConfig() *Config {c := &Config{ScaleThreshold: defaultScalaThreshold,}return c
}

defaultPool的名称为gopool.DefaultPool,池子容量一万,扩容阈值为1

当调用gopool.Go()时,gopool就会更新维护的任务链表,并且判断是否需要扩容worker:

  • 若此时已经有很多worker启动(底层一个worker对应一个goroutine),不需要扩容,就直接返回
  • 若判断需要扩容,就创建一个新的worker,并调用worker.run()方法启动,各个worker会异步地检查pool里面的任务链表是否还有待执行的任务,如果有就执行

gopool中三个角色的定位:

  • task是一个待执行的任务节点,同时还包含了指向下一个任务的指针,链表结构
  • worker是一个实际执行任务的执行器,它会异步启动一个goroutine执行协程池里面未执行的task
  • pool是一个逻辑上的协程池,对应了一个task链表,同时负责维护task状态的更新,以及在需要的时候创建新的worker

gopool核心实现原理如下图:

4、使用sync.Pool进行性能优化

gopool中多次使用了sync.Pool来池化对象的创建,复用woker和task对象

task池化:

// util/gopool/pool.go
var taskPool sync.Poolfunc init() {taskPool.New = newTask
}func newTask() interface{} {return &task{}
}func (t *task) Recycle() {t.zero()taskPool.Put(t)
}

worker池化:

// util/gopool/worker.go
var workerPool sync.Poolfunc init() {workerPool.New = newWorker
}func newWorker() interface{} {return &worker{}
}func (w *worker) Recycle() {w.zero()workerPool.Put(w)
}

参考:

解析 Golang 协程池 gopool 设计与实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/601821.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Astra深度相机在Ubuntu18.04系统下实现相机标定

问题&#xff1a; 当使用Astra相机的启动的指令启动相机后&#xff0c;使用rviz查看相机所发布的rgb数据时&#xff0c;在终端会出现如下的提示信息&#xff1a; Camera calibration file /home/car/.ros/camera_info/rgb_Astra_Orbbec.yaml not found. Camera calibration fil…

HarmonyOS实战开发-如何实现跨应用数据共享实例。

介绍 本示例实现了一个跨应用数据共享实例&#xff0c;分为联系人&#xff08;数据提供方&#xff09;和联系人助手&#xff08;数据使用方&#xff09;两部分&#xff1a;联系人支持联系人数据的增、删、改、查等功能&#xff1b;联系人助手支持同步联系人数据&#xff0c;当…

Dubbo 序列化

Dubbo 序列化 1、什么是序列化和反序列化 序列化&#xff08;serialization&#xff09;在计算机科学的资料处理中&#xff0c;是指将数据结构或对象状态转换成可取用格式&#xff08;例如存成文件&#xff0c;存于缓冲&#xff0c;或经由网络中发送&#xff09;&#xff0c;…

HiveSQL之lateral view

lateral view是hiveQL中的一个高级功能&#xff0c;用于和表生成函数一起&#xff0c;来处理嵌套数组和结构的数据&#xff0c;特别是在处理复杂的数据结构如JSON或数组内嵌套数组时特别有用。它允许用户在每一行上应用TGF&#xff08;表生成函数&#xff09;&#xff0c;将生成…

蓝桥杯刷题--RDay5

清理水域--枚举 8.清理水域 - 蓝桥云课 (lanqiao.cn)https://www.lanqiao.cn/problems/2413/learning/?page1&first_category_id1&second_category_id3&tags2023 小蓝有一个n m大小的矩形水域&#xff0c;小蓝将这个水域划分为n行m列&#xff0c;行数从1…

云计算(五)—— OpenStack基础环境配置与API使用

OpenStack基础环境配置与API使用 项目实训一 【实训题目】 使用cURL命令获取实例列表 【实训目的】 理解OpenStack的身份认证和API请求流程。 【实训准备】 &#xff08;1&#xff09;复习OpenStack的认证与API请求流程的相关内容。 &#xff08;2&#xff09;熟悉cURL…

CSS层叠样式表学习(基础选择器)

&#xff08;大家好&#xff0c;今天我们将继续来学习CSS&#xff08;2&#xff09;的相关知识&#xff0c;大家可以在评论区进行互动答疑哦~加油&#xff01;&#x1f495;&#xff09; 目录 二、CSS基础选择器 2.1 CSS选择器的作用 2.2 选择器分类 2.3 标签选择器 2.…

Java前置一些知识

文章目录 搭建Java环境安装path环境变量Java技术体系 Java执行原理JDK组成跨平台 IDEA管理Java程序 搭建Java环境 安装 oralce官网下载 JDK17 Windows 傻瓜式的点下一步就行&#xff0c;注意&#xff1a;安装目录不要有空格、中文 java 执行工具 javac 编译工具 类名和文件…

如何监控特权帐户,保护敏感数据

IT基础设施的增长导致员工可以访问的凭据和资源数量急剧增加。每个组织都存储关键信息&#xff0c;这些信息构成了做出关键业务决策的基石。与特权用户共享这些数据可以授予他们访问普通员工没有的凭据的权限。如果特权帐户凭证落入不法分子之手&#xff0c;它们可能被滥用&…

刷题之动态规划-子序列

前言 大家好&#xff0c;我是jiantaoyab&#xff0c;开始刷动态规划的子序列类型相关的题目&#xff0c;子序列元素的相对位置和原序列的相对位置是一样的 动态规划5个步骤 状态表示 &#xff1a;dp数组中每一个下标对应值的含义是什么>dp[i]表示什么状态转移方程&#xf…

RAG基础知识及应用

简单介绍下RAG的基础知识和RAG开源应用 “茴香豆" 一. RAG 基础知识 1. RAG工作原理 RAG是将向量数据库和大模型问答能力的有效结合&#xff0c;从而达到让大模型的知识能力增加的目的。首先将知识源存储在向量数据库中&#xff0c;当提出问题时&#xff0c;去向量数据库…

Redis中的集群(一)

集群 概述 Redis集群是Redis提供的分布式数据库方案&#xff0c;集群通过分片(sharding)来进行数据共享&#xff0c;并提供复制和故障转移功能 节点 一个Redis集群通常由多个节点(node)组成&#xff0c;在刚开始的时候&#xff0c;每个节点都是相互独立的&#xff0c;它们都…