Golang协程池ants库的学习、使用及源码阅读,协程池与GMP模型关系的理解

前言

在工作时遇到了一个需要使用ants协程池的地方,因此顺带来学习一下他的原理。

在这里插入图片描述

协程池

Golang的资源还是偏少一些…因此先简单的参考学习了一下线程池。

类似于Java中的线程池,协程池也是为了减少协程频繁创建、销毁所带来资源消耗的问题。按默认每个goroutine 8kb内存来算,几十万个goroutine就会占满8Gb内存。同时,由于goroutine的结束需要等待自身运行结束才可以销毁,所以也可能出现goroutine泄露的问题。因此需要使用协程池,来进行管理。

本文是ants协程池的学习,其他优秀的协程池日后有机会再去学习。

ants库对性能的提升

直接引用作者的结论“用ants的吞吐性能相较于原生 goroutine 可以保持在 2-6 倍的性能压制,而内存消耗则可以达到 10-20 倍的节省优势。”

在goroutine越多时候,提升越明显。具体请参阅作者文档中给出的性能测试部分。——https://github.com/panjf2000/ants/blob/dev/README_ZH.md

ants库工作流程

这个是作者在文档中所给出的流程图。在原文档中还提供了动态图,但我的电脑上他们没动…所以我也就不搬运了。具体还是请参阅作者文档。

在这里插入图片描述

当一个任务被丢尽协程池后,大致分为以下几步:
1、判断是否有可用的worker。若有,分配任务到worker,并执行即可。
2、若无可用的worker,判断是否达到容量上限。若未达到上限,新增一个worker并分配执行即可。
3、若已达到上限,则判断是否允许阻塞。若不允许,则直接返回nil即可。
4、若允许阻塞,则阻塞至有可用worker,再分配执行即可。
5、对于一个执行完成任务的worker,再放回工作池。

ants库源码对照阅读

由于笔者学术不精且文笔欠佳,因此此部分仅是简单阅读记录,一篇详细且高质量的源码阅读请参阅Go 每日一库之 ants(源码赏析)。此文阅读、解析的十分详细。

新建Pool

Pool所用到的的数据结构如下。

type Pool struct {// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool// which submits a new task to the same pool.capacity int32// running is the number of the currently running goroutines.running int32// lock for protecting the worker queue.lock sync.Locker// workers is a slice that store the available workers.workers workerQueue// state is used to notice the pool to closed itself.state int32// cond for waiting to get an idle worker.cond *sync.Cond// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.workerCache sync.Pool// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lockwaiting int32purgeDone int32stopPurge context.CancelFuncticktockDone int32stopTicktock context.CancelFuncnow atomic.Valueoptions *Options
}

其中大致如下几个参数:
capacity:池容量,表示ants最多能创建的 goroutine 数量。负值意味着池的容量是无限的,使用无限池是为了避免由于嵌套使用池而导致的无限阻塞的潜在问题:提交一个任务到池,该池将一个新任务提交到相同的池。
running:是当前正在运行的goroutines的数量。
workers:存储可用worker的切片。works是一个workerQueue类型的接口,位于workerQueue.go这个文件中。
state:记录池子当前是否已关闭(CLOSED)。
waiting:已经被pool.Submit()阻塞的协程数量,由pool.lock保护。
lock:锁。
cond:条件变量。处理任务等待和唤醒。
blockingNum:阻塞等待的任务数量。
其他具体的见源码中

// NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) {if size <= 0 {size = -1}opts := loadOptions(options...)if !opts.DisablePurge {if expiry := opts.ExpiryDuration; expiry < 0 {return nil, ErrInvalidPoolExpiry} else if expiry == 0 {opts.ExpiryDuration = DefaultCleanIntervalTime}}if opts.Logger == nil {opts.Logger = defaultLogger}p := &Pool{capacity: int32(size),lock:     syncx.NewSpinLock(),options:  opts,}p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}if p.options.PreAlloc {if size == -1 {return nil, ErrInvalidPreAllocSize}p.workers = newWorkerQueue(queueTypeLoopQueue, size)} else {p.workers = newWorkerQueue(queueTypeStack, 0)}p.cond = sync.NewCond(p.lock)p.goPurge()p.goTicktock()return p, nil
}

NewPool就是新建一个Pool,其接收的参数是(size int, options ...Option) ,第一个是容量,其他可选项。

通过Submit进行任务提交

此部分源码如下

func (p *Pool) Submit(task func()) error {if p.IsClosed() {return ErrPoolClosed}w, err := p.retrieveWorker()if w != nil {w.inputFunc(task)}return err
}

首先判断池子是否关闭,关闭则直接err
接下来通过retrieveWorker来判断是否有空闲的worker
w!=nil 即有空闲worker,则通过w.inputFunc(task)新增一个任务。

w.inputFunc(task)部分如下

func (w *goWorker) inputFunc(fn func()) {w.task <- fn
}

是把任务提交到了一个goWorker结构体中的task chan func()。这个goworkerworker接口的实现,他其中chan的任务会进行run

通过retrieveWorker判断空闲worker

此部分源码如下,其所返回的是一个可用的worker


// retrieveWorker returns an available worker to run the tasks.
func (p *Pool) retrieveWorker() (w worker, err error) {p.lock.Lock()retry:// First try to fetch the worker from the queue.if w = p.workers.detach(); w != nil {p.lock.Unlock()return}// If the worker queue is empty, and we don't run out of the pool capacity,// then just spawn a new worker goroutine.if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()w = p.workerCache.Get().(*goWorker)w.run()return}// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {p.lock.Unlock()return nil, ErrPoolOverload}// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.p.addWaiting(1)p.cond.Wait() // block and wait for an available workerp.addWaiting(-1)if p.IsClosed() {p.lock.Unlock()return nil, ErrPoolClosed}goto retry
}

首先通过p.workers.detach()判断是否有空闲的worker。若有,则直接返回可用的worker

若无,通过

if capacity := p.Cap(); capacity == -1 || capacity > p.Running() 

这一个判断来判断是否耗尽了池容量。若没有耗尽,则通过p.workerCache.Get().(*goWorker)生成一个新的工作goroutine,并返回workers

继续执行则是“没有空闲worker且池容量已满”的情况。便通过

	if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) 

判断,是否处于非阻塞模式,或者待处理的呼叫者数量达到最大值,若是,则退出返回nil

反之,通过

	p.addWaiting(1)p.cond.Wait() // block and wait for an available workerp.addWaiting(-1)

进行阻塞等待可用的即可。

此部分整体通过一个retry:标签和goto retry来进行控制,通过一个lock保证了安全。具体看作者源码部分,此处不搬运描述。

run部分

chan中的任务会依次去runrun的源码部分如下:


// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *goWorker) run() {w.pool.addRunning(1)go func() {defer func() {w.pool.addRunning(-1)w.pool.workerCache.Put(w)if p := recover(); p != nil {if ph := w.pool.options.PanicHandler; ph != nil {ph(p)} else {w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())}}// Call Signal() here in case there are goroutines waiting for available workers.w.pool.cond.Signal()}()for f := range w.task {if f == nil {return}f()if ok := w.pool.revertWorker(w); !ok {return}}}()
}

简单来说,这部分是开协程,并在其中:增减相关计数、运行函数。相当于是把goroutine又交给了底层GMP模型去进行处理。

ants库的使用

根据作者文档的描述,主要提供了下述几种功能“任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool”。
下面是我一次使用ants库记录。

https://blog.csdn.net/Ws_Te47/article/details/135484767

此次使用只是用到了最基础的功能,新建Pool、任务提交、释放Pool。
一个更详细的使用demo请见——https://darjun.github.io/2021/06/03/godailylib/ants/

关于GMP模型和协程池

在昨天的初步学习中,曾出现过一个疑惑,GMP模型和协程池,到底是什么关联?我疑惑在他们都是用于协程的分配与控制。协程池通过一系列的判断来判断是否运行,但GMP不也有队列去分配吗?

今天突然恍然大悟,在学习完源码后也进一步清晰了他。

GMP模型是用于协程分配给线程这一个过程的,他的起点是“一个新建好的协程”,并将这个协程去进行一系列的排队、抢占调度等 操作,最终终点是分配到线程,并最终经CPU进行执行。

但协程池的主要目的是goroutine的复用,减少因频繁新建、销毁goroutine所带来的性能损耗。他的起点是“一个需要通过goroutine执行的方法”,通过一系列的判断、分配,最终的终点是在run方法中去启用一个goroutine。

所以可以说,协程池的终点是GMP的起点。一个任务先通过协程池去挑选合适的时机,分配到对应的goroutine;再通过GMP模型,去分配给对应的线程,最终分配到CPU进行执行。

参考资源

官方文档,唯一真神…

Java线程池实现原理及其在美团业务中的实践。

Go 每日一库之 ants

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/336651.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Multimodal Segmentation of Medical Images with Heavily Missing Data

F是mapping function 吐槽 图3太简单了吧。作者未提供代码

Mysql系列-1.Mysql基本使用

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring源码、JUC源码、Kafka原理、分布式技术原理、数据库技术&#x1f525;如果感觉博主的文章还不错的…

【大模型】大型模型飞跃升级—文档图像识别领域迎来技术巨变

写在前面 2023年12月31日&#xff0c;第十九届中国图象图形学学会青年科学家会议在广州举行&#xff0c;由中国图象图形学学会主办。 该会议的目标是促进青年科学家之间的交流与合作&#xff0c;以提升我国在图像图形领域的科研水平和创新能力。 由中国图象图形学学会和上海合合…

微信小程序Canvas画布绘制图片、文字、矩形、(椭)圆、直线

获取CanvasRenderingContext2D 对象 .js onReady() {const query = wx.createSelectorQuery()query.select(#myCanvas).fields({ node: true, size: true }).exec((res) => {const canvas = res[0].nodeconst ctx = canvas.getContext(2d)canvas.width = res[0].width * d…

毛泽东,如何熬过人生至暗时刻?

文章目录 一、8年内三次蒙冤1、第一次蒙冤2、第二次蒙冤3、第三次蒙冤 二、毛泽东是怎么面对逆境的&#xff1f;三、极致的乐观精神四、结语参考文献 一件事&#xff0c;你做对了&#xff0c;立了功&#xff0c;但结果却是严厉的惩罚&#xff0c;甚至让你回到原点&#xff0c;你…

《Shader开发实战》-笔记

一、初识游戏图形 1、什么是渲染&#xff1f; 渲染实际上就是创建图像的过程&#xff0c;在渲染过程中创建的图像被称为渲染或者帧&#xff0c;该图像&#xff08;帧&#xff09;以每秒多次在计算机屏幕上进行呈现&#xff0c;即帧率。 负责渲染图像&#xff08;帧&#xff09…

每天刷两道题——第十一天

1.1滑动窗口最大值 给你一个整数数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。返回滑动窗口中的最大值 。 输入&#xff1a;nums [1,3,-1,-3,5,3,6,7], k 3 输出&…

FullCalendar简单的应用(新增,拖动,编辑事件)

大概效果&#xff1a; 以 react 为例&#xff1a; 日历组件所需依赖&#xff1a; (Plugin Index - Docs | FullCalendar) "fullcalendar/core": "^6.1.10", "fullcalendar/daygrid": "^6.1.10", "fullcalendar/interaction&quo…

新书速览|循序渐进Vue.js 3.x前端开发实战

Vue.js初学者和前端开发人员使用&#xff0c;网课、培训机构与大中专院校的教学用书 作者简介 张益珲 美国亚利桑那州立大学计算机工程技术硕士&#xff0c;架构师&#xff0c;从业近10年&#xff0c;多年大前端开发经验&#xff0c;曾就职于知名上市公司&#xff0c;主导开发…

反向代理+web集群+mysql mha实验总结

一、实验步骤 1、部署框架前准备工作 服务器类型部署组件ip地址DR1调度服务器 主&#xff08;ha01&#xff09;KeepalivedLVS-DR192.168.86.13DR2调度服务器 备 (ha02)KeepalivedLVS-DR192.168.86.14web1节点服务器 (slave01)NginxTomcatMySQL 备MHA managerMHA node192.168.8…

详解Oracle数据库的启动

Oracle数据库的启动&#xff0c;其概念可参考Overview of Instance and Database Startup。 其过程可参见下图&#xff1a; 当数据库从关闭状态进入打开数据库状态时&#xff0c;它会经历以下阶段。 阶段Mount状态描述1实例在没有挂载数据库的情况下启动实例已启动&#xff…

微信小程序如何自定义导航栏,怎么确定导航栏及状态栏的高度?导航栏被刘海、信号图标给覆盖了怎么办?

声明&#xff1a;本文为了演示效果&#xff0c;颜色采用的比较显眼&#xff0c;可根据实际情况修改颜色 问题描述 当我们在JSON中将navigationStyle设置成custom后&#xff0c;当前页面的顶部导航栏就需要我们制作了&#xff0c;但出现了一下几个问题&#xff1a; 导航栏的高…