golang中的并发模型

并发模型

传统的编程语言(如C++、Java、Python等)并非为并发而生的,因此它们面对并发的逻辑多是基于操作系统的线程。其并发的执行单元(线程)之间的通信利用的也是操作系统提供的线程或进程间通信的原语,比如共享内存、信号、管道、消息队列、套接字等。在这些通信原语中,使用最多、最广泛同时也最高效的是结合了线程同步原语(比如锁以及更为低级的原子操作)的共享内存方式,因此,可以说传统语言的并发模型是基于共享内存的模型

Untitled

这些传统的就基于共享内存的并发模型难用且易错,在大型程序中,开发人员在设计并发程序时需要根据线程模型对程序进行建模同时规划线程之间的通信方式,且程序难以阅读、理解、维护

Go采用了CSP(Communicating Sequential Process,通信顺序进程)模型

一个符合CSP模型的并发程序应该是一组通过输入/输出原语连接起来的P的集合

Untitled

CSP模型旨在简化并发程序的编写,让并发程序的编写与编写顺序程序一样简单。Tony Hoare认为输入/输出应该是基本的编程原语,数据处理逻辑(CSP中的P)仅需调用输入原语获取数据,顺序处理数据,并将结果数据通过输出原语输出

CSP理论中的P(Process,进程)是个抽象概念,它代表任何顺序处理逻辑的封装,它获取输入数据(或从其他P的输出获取),并生产可以被其他P消费的输出数据。

为了实现CSP模型中的输入/输出原语,Go引入了goroutine(P)之间的通信原语channel。通过channel将goroutine(P)组合与连接在一起,这使得设计和编写大型并发系统变得更为简单和清晰

虽然CSP模型已经成为Go语言支持的主流并发模型,但Go也支持传统的基于共享内存的并发模型,并提供基本的低级同步原语(主要是sync包中的互斥锁、条件变量、读写锁、原子操作等

那么在实践中应该如何选择是使用channel还是低级同步原语下的共享内存?

Go始终推荐以CSP模型风格构建并发程序,尤其是在复杂的业务层面。这将提升程序逻辑的清晰度,大大降低并发设计的复杂性,并让程序更具可读性和可维护性;

对于局部情况,比如涉及性能敏感的区域或需要保护的结构体数据,可以使用更为高效的低级同步原语(如sync.Mutex),以保证goroutine对数据的同步访问。

并发模式

在语言层面,Go针对CSP模型提供了三种并发原语。

  • goroutine:对应CSP模型中的P,封装了数据的处理逻辑,是Go运行时调度的基本执行单元。
  • channel:对应CSP模型中的输入/输出原语,用于goroutine之间的通信和同步。
  • select:用于应对多路输入/输出,可以让goroutine同时协调处理多个channel操作。

深入了解一下在实践中这些原语的常见组合方式,即并发模式:

创建模式

go关键字+function/method 创建 goroutine:

go fmt.println("I'm a goroutine")
​
c := srv.NewConn(rw)
go c.serve(connCtx)

在稍微复杂的程序里,需要考虑通过原语的承载体channel在goroutine间建立联系,所以通常采用以下方式建立goroutine:

type T struct {...}func spwan(f func()) chan T {c := make(chan T)go func() {...f()...}()return c
}func main() {
//使用c与新创建的goroutine通信c := spawn(func(){})
}

在内部创建一个goroutine并返回一个channel类型变量函数

spwan函数创建的新的goroutine和调用spwan函数的goroutine通过channel建立联系

函数得以实现得益于channel作为go语言的一等公民(first-class citizen)的存在:channel可以像变量一样被初始化、传递和赋值。上面例子中的spawn只返回了一个channel变量、

退出模式

goroutine的执行函数返回意味着goroutine退出。但有些时候会要求优雅退出,以下为方案:

分离(detached)模式

是使用最广泛的goroutine退出模式

创建它的goroutine不需要关心它的退出,这类goroutine在启动后即与其创建者彻底分离,其生命周期与其执行的主函数相关,函数返回即goroutine退出。这类goroutine有两个常见用途。

一次性任务:用来执行任务完成后既退出,比如此标准库代码:

// $GOROOT/src/net/dial.gofunc (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {...if oldCancel := d.Cancel; oldCancel != nil {subCtx, cancel := context.WithCancel(ctx)defer cancel()
//有数据处理后既退出go func() {select {case <-oldCancel:cancel()case <-subCtx.Done():}}()ctx = subCtx}...
}

常驻后台执行的一些特定任务:如监视(monitor)、观察(watch)等。其实现通常采用for {…}或for { select{…} }代码段形式,并多以定时器(timer)或事件(event)驱动执行。

// $GOROOT/src/runtime/mgc.go
func gcBgMarkStartWorkers() {// 每个P都有一个运行在后台的用于标记的Gfor _, p := range allp {if p.gcBgMarkWorker == 0 {go gcBgMarkWorker(p) // 为每个P创建一个goroutine,以运行gcBgMarkWorkernotetsleepg(&work.bgMarkReady, -1)noteclear(&work.bgMarkReady)}}
}func gcBgMarkWorker(_p_ *p) {gp := getg()...for { // 常驻后台处理GC事宜...}
}

Join模式

在线程模型中,父线程可以通过pthread_join来等待子线程结束并获取子线程的结束状态。

在Go中,我们有时候也有类似的需求:goroutine的创建者需要等待新goroutine结束。

  • 等待一个goroutine退出

先看一段实例代码

func worker(args ...interface{}) {if len(args) == 0 {return}interval, ok := args[0].(int)if !ok {return}time.Sleep(time.Second * (time.Duration(interval)))
}func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {c := make(chan struct{})go func() {f(args...)c <- struct{}{}}()return c
}func main() {done := spawn(worker, 5)println("spawn a worker goroutine")<-doneprintln("worker done")
}

这个channel的用途就是在两个goroutine之间建立退出事件的“信号”通信机制。main goroutine在创建完新goroutine后便在该channel上阻塞等待,直到新goroutine退出前向该channel发送了一个信号。

运行过后

Untitled

  • 获取goroutine的退出状态

如果不仅要等goroutine退出还要精准获取其结束状态,可以通过自定义类型的channel实现这一需求:

var OK = errors.New("ok")func worker(args ...interface{}) error {if len(args) == 0 {return errors.New("invalid args")}interval, ok := args[0].(int)if !ok {return errors.New("invalid interval arg")}time.Sleep(time.Second * (time.Duration(interval)))return OK
}func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {c := make(chan error)go func() {c <- f(args...)}()return c
}func main() {done := spawn(worker, 5)println("spawn worker1")err := <-donefmt.Println("worker1 done:", err)done = spawn(worker)println("spawn worker2")err = <-donefmt.Println("worker2 done:", err)
}

将channel中承载的类型由struct{}改为了error,这样channel承载的信息就不只是一个信号了,还携带了有价值的信息:新goroutine的结束状态。运行上述示例:

Untitled

  • 等待多个goroutine退出

有时候必须等待全部新goroutine退出,可以通过Go语言提供的sync.WaitGroup实现等待多个goroutine退出的模式:

func worker(args ...interface{}) {if len(args) == 0 {return}interval, ok := args[0].(int)if !ok {return}time.Sleep(time.Second * (time.Duration(interval)))
}func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {c := make(chan struct{})var wg sync.WaitGroupfor i := 0; i < n; i++ {wg.Add(1)go func(i int) {name := fmt.Sprintf("worker-%d:", i)f(args...)println(name, "done")wg.Done() // worker done!}(i)}go func() {wg.Wait()c <- struct{}{}}()return c
}func main() {done := spawnGroup(5, worker, 3)println("spawn a group of workers")<-doneprintln("group workers done")
}

通过sync.WaitGroup,spawnGroup每创建一个goroutine都会调用wg.Add(1),新创建的goroutine会在退出前调用wg.Done。

在spawnGroup中还创建了一个用于监视的goroutine,该goroutine调用sync.WaitGroup的Wait方法来等待所有goroutine退出。

在所有新创建的goroutine退出后,Wait方法返回,该监视goroutine会向done这个channel写入一个信号,这时main goroutine才会从阻塞在done channel上的状态中恢复,继续往下执行。

运行上述示例代码:

支持超时机制的等待

设置合理的退出时间,如若没有退出,则继续执行下一步:

func main() {done := spawnGroup(5, worker, 30)println("spawn a group of workers")timer := time.NewTimer(time.Second * 5)defer timer.Stop()select {case <-timer.C:println("wait group workers exit timeout!")case <-done:println("group workers done")}
}

notify-and-wait模式

main goroutine的停止代表着整个程序的停止,如果不事先通知退出,则容易导致业务数据损坏、不完整

我们可以通过notify-and-wait(通知并等待)模式来满足这一场景的要求。虽然这一模式也不能完全避免损失,但是它给了各个goroutine一个挽救数据的机会,从而尽可能减少损失。

  • 通知并等待一个goroutine的退出
func worker(j int) {time.Sleep(time.Second * (time.Duration(j)))
}func spawn(f func(int)) chan string {quit := make(chan string)go func() {var job chan int // 模拟job channelfor {select {case j := <-job:f(j)case <-quit:quit <- "ok"}}}()return quit
}func main() {quit := spawn(worker)println("spawn a worker goroutine")time.Sleep(5 * time.Second)// 通知新创建的goroutine退出println("notify the worker to exit...")quit <- "exit"timer := time.NewTimer(time.Second * 10)defer timer.Stop()select {case status := <-quit:println("worker done:", status)case <-timer.C:println("wait worker exit timeout")}
}

执行

此时,spawn函数不仅发送退出信号给创建者还承载创建者发送的退出信号,形成了一个双向的数据通道

  • 通知并等待多个goroutine退出

channel存在一个特性:当使用close关闭channel时,所有阻塞到该channel上的goroutine都会得到通知,所以可以利用这一特性实现这一模式:

func worker(j int) {time.Sleep(time.Second * (time.Duration(j)))
}func spawnGroup(n int, f func(int)) chan struct{} {quit := make(chan struct{})job := make(chan int)var wg sync.WaitGroupfor i := 0; i < n; i++ {wg.Add(1)go func(i int) {defer wg.Done() // 保证wg.Done在goroutine退出前被执行name := fmt.Sprintf("worker-%d:", i)for {j, ok := <-jobif !ok {println(name, "done")return}// 执行这个jobworker(j)}}(i)}go func() {<-quitclose(job) // 广播给所有新goroutinewg.Wait()quit <- struct{}{}}()return quit
}func main() {quit := spawnGroup(5, worker)println("spawn a group of workers")time.Sleep(5 * time.Second)// 通知 worker goroutine 组退出println("notify the worker group to exit...")quit <- struct{}{}timer := time.NewTimer(time.Second * 5)defer timer.Stop()select {case <-timer.C:println("wait group workers exit timeout!")case <-quit:println("group workers done")}
}

创建者直接利用了worker goroutine接收任务(job)的channel来广播退出通知,而实现这一广播的代码就是close(job)。此时各个worker goroutine监听job channel,当创建者关闭job channel时,通过“comma ok”模式获取的ok值为false,也就表明该channel已经被关闭,于是worker goroutine执行退出逻辑(退出前wg.Done()被执行)。

执行:

退出模式的应用

由于goroutine的运行状态不同,因此很难用同种框架全面管理,所以我们可以只实现一个“超时等待退出”框架,以统一解决各种运行状态

一组goroutine的退出有两种情况,第一种情况是并发退出,当goroutine的退出先后数据对数据处理无影响时可使用;另一种则是串行退出,也就是次序错误可能导致程序状态混乱和错误

  • 并发退出
  • 串行退出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/195119.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

闭眼检测实现

引言 这段代码是一个实时眼睛状态监测程序&#xff0c;可以用于监测摄像头捕获的人脸图像中的眼睛状态&#xff0c;判断眼睛是否闭合。具体应用实现作用说明如下&#xff1a; 1. 实时监测眼睛状态 通过摄像头捕获的实时视频流&#xff0c;检测人脸关键点并计算眼睛的 EAR&a…

基于灰狼算法(GWO)优化的VMD参数(GWO-VMD)

代码的使用说明 基于灰狼算法优化的VMD参数 代码的原理 基于灰狼算法&#xff08;Grey Wolf Optimizer, GWO&#xff09;优化的VMD参数&#xff08;GWO-VMD&#xff09;是一种结合了GWO和VMD算法的优化方法&#xff0c;用于信号分解和特征提取。 GWO是一种基于群体智能的优化…

辅助解决小白遇到的电脑各种问题

写这个纯属是为了让电脑小白知道一些电脑上的简单操作&#xff0c;勿喷&#xff01;&#xff01;&#xff01; 一&#xff1a;当小白遇到电脑程序不完全退出怎么办&#xff1f; 使用软件默认的退出方式 此处拿百度网盘举例&#xff1a; 用户登录网盘后&#xff1a; 如果直接点…

多线程编程

1 线程的使用 1.1 为什么要使用多线程 在编写代码时&#xff0c;是否会遇到以下的场景会感觉到难以下手&#xff1f; 要做 2 件事&#xff0c;一件需要阻塞等待&#xff0c;另一件需要实时进行。例如播放器&#xff1a;一边在屏幕上播放视频&#xff0c;一边在等待用户的按…

Hive 定义变量 变量赋值 引用变量

Hive 定义变量 变量赋值 引用变量 变量 hive 中变量和属性命名空间 命名空间权限描述hivevar读写用户自定义变量hiveconf读写hive相关配置属性system读写java定义额配置属性env只读shell环境定义的环境变量 语法 Java对这个除env命名空间内容具有可读可写权利&#xff1b; …

2020年09月 Scratch(二级)真题解析#中国电子学会#全国青少年软件编程等级考试

Scratch等级考试(1~4级)全部真题・点这里 一、单选题(共25题,每题2分,共50分) 第1题 下面哪个按钮可以实现音乐结束时音量慢慢变小? A: B: C: D:

RE2文本匹配实战

引言 今天我们来实现RE2进行文本匹配&#xff0c;模型实现参考了官方代码https://github.com/alibaba-edu/simple-effective-text-matching-pytorch。 模型实现 RE2模型架构如上图所示。它的输入是两个文本片段&#xff0c;所有组件参数除了预测层和对齐层外都是共享的。上图…

从零开始:Rust环境搭建指南

大家好&#xff01;我是lincyang。 今天&#xff0c;我们将一起探讨如何从零开始搭建Rust开发环境。 Rust环境搭建概览 Rust是一种系统编程语言&#xff0c;以其安全性、并发性和性能闻名。搭建Rust环境是学习和使用这一语言的第一步。 第一步&#xff1a;安装Rust Rust的…

nginx后端服务器在负载均衡调度中的状态

状态说明 down 状态说明当前的sever暂时不参与负载均衡

ACWSpring1.3

首先,前端写ajax写上我们的访问路径(就在我们前端的源代码里面),我们建了两个包pkController用于前端页面url映射过来一层一层找到我们的RestController返回bot1里面有键值,返回的这就是一个session对象bot1这个map.前端拿到我们bot1里的两个值给到我们前端显示出来 1准备页面:…

Java概述

接触Java后会发现它的体系有一个特点&#xff0c;就是非常喜欢用“J”字母开头的缩写&#xff0c;比如JCP, JSR, JMS, JPA, JSP, JAX-RS......它们有些是规范&#xff0c;有些是组织的名称&#xff0c;表意多样&#xff0c;对第一次接触的人来说很可能会觉得混乱&#xff0c;本…

吾爱破解置顶的“太极”,太好用了吧!

日常工作和娱乐&#xff0c;都需要用到不同类型的软件&#xff0c;哪怕软件体积不大&#xff0c;也必须安装&#xff0c;否则到用时找不到就非常麻烦了。 其实&#xff0c;很多软件不一定一样不剩地全部安装一遍&#xff0c;一方面原因是用的不多&#xff0c;另一方面多少有点…