字节二面:你怎么理解信道是golang中的顶级公民

news/2025/1/7 18:42:38/文章来源:https://www.cnblogs.com/JulianHuang/p/18655087

1. 信道是golang中的顶级公民

goroutine结合信道channel是golang中实现并发编程的标配。

信道给出了一种不同于传统共享内存并发通信的新思路,以一种通道复制的思想解耦了并发编程的各个参与方。

信道分为两种: 无缓冲和有缓冲信道(先入先出)。

分别用于goroutine同步和异步生产消费:

无缓冲信道: 若没有反向的goroutine在做动作, 当前goroutine会阻塞;
有缓冲信道: goroutine 直接面对的是缓冲队列, 队列满则写阻塞, 队列空则读阻塞。

一个陷阱: 信道被关闭后, 原来的goroutine阻塞状态不会维系, 能从信道读取到零值。

image.png

for range可以用于信道 :
一直从指定信道中值, 没有数据会阻塞, 直到信道关闭会自动退出循环。

var ch chan int = make(chan int, 10)
go func() {for i := 0; i < 20; i++ { ch <- i}close(ch)
}()time.Sleep(time.Second * 2)
for ele := range ch {fmt.Println(ele)
}output: 0,1,2,3,4...19

image.png

上面的示例描述了信道4个阶段:
写完10个数据(阻塞写)、暂停2s、
读取10个数据(解除阻塞写)、读完20个数据、关闭信道。

2. 信道channel实现思路大盘点

channel是指向hchan结构体的指针.

        type hchan struct {qcount   uint           // 队列中已有的缓存元素的数量dataqsiz uint           // 环形队列的容量buf      unsafe.Pointer // 环形队列的地址elemsize uint16closed   uint32        // 标记是否关闭,初始化为0,一旦close(ch)为1elemtype *_type // 元素类型sendx    uint   // 待发送的元素索引recvx    uint   // 待接受元素索引recvq    waitq  // 阻塞等待的读goroutine队列sendq    waitq  // 阻塞等待的写gotoutine队列// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex}type waitq struct {  first *sudog  last *sudog  }

image.png

2.1 静态全局解读

两个核心的结构

① 环形队列buf (buf、dataqsize、sendx、recvx 圈定了一个有固定长度,由读/写指针控制队列数据的环形队列),从这看出队列是以链表实现。

② 存放阻塞写G和阻塞读G的队列sendqrecvq, recvq、sendq存放的不是当前通信的goroutine, 而是因读写信道而阻塞的goroutine:

  • 如果 qcount <dataqsiz(队列未满),sendq就为空(写就不会阻塞);
  • 如果 qcount >0 (队列不为空),recvq就为空(读就不会阻塞)。

一旦解除阻塞,读/写动作会给到先进入阻塞队列的goroutine,也就是 recvq、sendq也是先进先出。

2.2 动态解读demo

以第一部分的demo为例:

第一阶段: 写入0到9这个10个元素

  1. goroutine在写数据之后会获取锁,以确保安全地修改信道底层的hchan结构体;
  2. 向环形队列buf入队enqueue元素,实际是将原始数据拷贝进环形队列buf的待插入位置sendx
  3. 入队操作完成,释放锁。

image.png

第二阶段:信道满,写阻塞(写goroutine会停止,并等待读操作唤醒)

① 基于写goroutine创建sudog, 并将其放进sendq队列中;

② 调用gopark函数,让调度器P终止该goroutine执行。

调度器P将该goroutine状态改为waiting, 并从调度器P挂载的runQueue中移除,调度器P重新出队一个G交给OS线程来执行,这就是上下文切换,G被阻塞了而不是OS线程。

image.png


读goroutine开始被调度执行:

第三阶段: 读前10个元素(解除写阻塞)

  1. for range chan: 读goroutine从buf中出队元素: 将信道元素拷贝到目标接收区;
  2. 写goroutine从sendq中出队,因为现在信道不满,写不会阻塞;
  3. 调度器P调用goready, 将写goroutine状态变为runnable,并移入runQueue。

image.png
下面的源码截取自chansend() ,
体现了写信道--> 写阻塞---> 被唤醒的过程

     // 这一部分是写数据, 从这里也可以看出是点对点的覆写,原buf内队列元素不用移动, 只用关注sendx  if c.qcount < c.dataqsiz {  // 信道未满,则写不会阻塞=>senq为空	qp := chanbuf(c, c.sendx)   // chanbuf(c, i) 返回的是信道buf中待插入的位置指针typedmemmove(c.elemtype, qp, ep)  c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++return true}if !block {       // 用于select case结构中,不阻塞select case的选择逻辑unlock(&c.lock)return false}// 这二部分是: 构建sudog,放进写阻塞队列,阻塞当前写gooroutine的执行// Block on the channel. Some receiver will complete our operation for us.gp := getg()     // 获取当前的goroutine  https://go.dev/src/runtime/HACKINGmysg := acquireSudog()   // sudog是等待队列sendq中的元素,封装了goroutinemysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nilc.sendq.enqueue(mysg)  // 当前goroutine压栈sendq// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.gp.parkingOnChan.Store(true)reason := waitReasonChanSendgopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)   // 这里是阻塞函数KeepAlive(ep)// 这三部分: 调度器唤醒了当前goroutine// someone woke us up.  if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)if closed {     // 已经关闭了,再写数据会panicif c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true

其中:

① getg 获取当前的goroutine,sudog是goroutine的封装,表征一个因读写信道而阻塞的G,

typedmemmove(c.elemtype, qp, ep): 写数据到信道buf,由两个指针来完成拷贝覆写。

  //  typedmemmove copies a value of type typ to dst from src.func typedmemmove(typ *abi.Type, dst, src unsafe.Pointer) {if dst == src {return}if writeBarrier.enabled && typ.Pointers() {// This always copies a full value of type typ so it's safe// to pass typ along as an optimization. See the comment on// bulkBarrierPreWrite.bulkBarrierPreWrite(uintptr(dst), uintptr(src), typ.PtrBytes, typ)}// There's a race here: if some other goroutine can write to// src, it may change some pointer in src after we've// performed the write barrier but before we perform the// memory copy. This safe because the write performed by that// other goroutine must also be accompanied by a write// barrier, so at worst we've unnecessarily greyed the old// pointer that was in src.memmove(dst, src, typ.Size_)if goexperiment.CgoCheck2 {cgoCheckMemmove2(typ, dst, src, 0, typ.Size_)}}

③ 我们看上面源码的第三部分, 唤醒了阻塞的写goroutine, 但是这里貌似没有将写goroutine携带的值传递给信道或对端。
实际上这个行为是在recv函数内。

跟一下接收方:读第一个元素,刚解除写阻塞的源码:

// 发现sendq有阻塞的写G,则读取,并使用该写G携带的数据填充数据
// Just found waiting sender with not closed.if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true
}
if c.qcount > 0 {  // 如果sendq队里没有阻塞G, 则直接从队列中读值// Receive directly from queue
}---{// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.qp := chanbuf(c, c.recvx)  // 拿到buf中待接受元素指针if raceenabled {racenotify(c, c.recvx, nil)racenotify(c, c.recvx, sg)}// copy data from queue to receiverif ep != nil {typedmemmove(c.elemtype, ep, qp)  // 将buf中待接收元素qp拷贝到目标指针ep}// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)  //  将阻塞sendq队列中出站的sudog携带的值写入到待插入指针。c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}

从上线源码可以验证:

读goroutine读取第一个元素之前,信道满,此时sendx=recvx,也即信道内读写指针指向同一个槽位;

② 读取第一个元素,解除写阻塞: sendq写G队列会出队第一个sudog, 将其携带的元素填充进buf待插入指针sendx,因为此时sendx=recvx,故第二次typedmemmove(c.elemtype, qp, sg.elem)是合理的。

如果sendq队列没有阻塞G, 则直接从buf中读取值。

3. 不要使用共享内存来通信,而是使用通信来共享内存

常见的后端java C#标配使用共享内存来通信, 比如 mutex、lock 关键词:
通过对一块共有的区域做属性变更来反映系统当前的状态,详细的请搜索同步索引块

golang 推荐使用通信来共享内存, 这个是怎么理解的呢?

你要想使用某块内存数据, 并不是直接共享给你, 而是给你一个信道作为访问的接口, 并且你得到的是目标数据的拷贝,由此形成的信道访问为通信方式;

而原始的目标数据的生命周期由产生这个数据的G来决定, 它甚至不用care自己是不是要被其他G获知,因此体现了解耦并发编程参与方的作用。

https://medium.com/womenintechnology/exploring-the-internals-of-channels-in-go-f01ac6e884dc

4. 信道的实践指南

4.1 无缓冲信道

结合了通信(值交换)和同步。

c := make(chan int)  // Allocate a channel.
// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {list.Sort()c <- 1  // Send a signal; value does not matter.
}()
doSomethingForAWhile()
<-c   // Wait for sort to finish; discard sent value.

4.2 有缓冲信道

基础实践: 信号量、限流能力

下面演示了:服务端使用有缓冲信道限制并发请求

var sem = make(chan int, MaxOutstanding) func Serve(queue chan *Request) {for req := range queue {req:= reqsem <- 1   go func() {   // 只会开启MaxOutstanding个并发协程process(req)<-sem}()}
}

上面出现了两个信道:
sem 提供了限制服务端并发处理请求的信号量
queue 提供了一个客户端请求队列,起媒介/解耦的作用

解多路复用

多路复用是网络编程中一个耳熟能详的概念,nginx redis等高性能web、内存kv都用到了这个技术 。

这个解多路复用是怎么理解呢?

我们针对上面的服务端,编写客户端请求, 独立的客户端请求被服务端Serve收敛之后, Serve就起到了多路复用的概念,在Request定义resultChan信道,就给每个客户端请求提供了独立获取请求结果的能力, 这便是一种解多路复用。

type Request struct {args        []intf           func([]int) intresultChan  chan int
}
request := &Request{[]int{3, 4, 5}, nil, make(chan int)}func SendReq(req *Request){// Send requestclientRequests <- request// Wait for response.fmt.Printf("answer: %d\n", <-request.resultChan)
}

在服务端,定义handler,返回响应结果

// 定义在服务端的处理handler
func sum(a []int) (s int) {for _, v := range a {s += v}return
}func process(req *Request) {req.f = sumreq.resultChan <- req.f(req.args)
}

image.png

基于cpu的并行编程

如果计算可被划分为独立的(不相互依赖的)计算分片,则可以利用信道开启CPU的并行编程能力。

var numCPU = runtime.NumCPU() // number of CPU coresfunc (v Vector) DoAll(u Vector) {c := make(chan int, numCPU)  // Buffering optional but sensible.for i := 0; i < numCPU; i++ {go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)}for i := 0; i < numCPU; i++ {<-c    // wait for one task to complete}// All done.
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/864911.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第七章 立项管理 (2025年详细解析版)

目录什么是立项管理7.1 项目建议与立项申请项目建议书核心价值(为什么要有项目建议书依据)主要内容(项目建议书包含内容)7.2 项目可行性研究什么是可行性研究可行性研究的特点可行性研究的内容(既可以用于初步可行性,也可以用在详细可行性分析)7.2.1 可行性研究的内容7.…

搭建gitlab私有仓库

1、下载gitlab 首先在 /etc/yum.repos.d/目录下配置gitlab下载镜像源。 # 进入目录 /etc/yum.repos.d/ cd /etc/yum.repos.d/# 创建文件 gitlab-ce.repo vim gitlab-ce.repo# 添加以下内容 [gitlab-ce] name=gitlab-ce baseurl=https://mirror.tuna.tsinghua.edu.cn/gitlab-ce…

夸克网盘批量转存、分享工具

下载地址:https://pan.quark.cn/s/acf1c325bd34更多内容及Java+大数据个人原创视频,可关注公众号观看:原创文章,转载请注明出处!!

E94 Tarjan边双缩点+树形DP P8867 [NOIP2022] 建造军营

视频链接: P8867 [NOIP2022] 建造军营 - 洛谷 | 计算机科学教育新生态// Tarjan边双缩点+树形DP O(n) #include<bits/stdc++.h> using namespace std;int read(){int x=0,f=1;char c=getchar();while(c>9||c<0){if(c==-) f=-1;c=getchar();}while(c>=0&&…

夸克网盘批量分享工具

该工具是一款可以批量转存夸克网盘文件并且批量分享夸克网盘文件的一个工具,使用易语言编写,能够快速分享转存夸克网盘文件。 使用教程 浏览器开启F12抓包,在个人夸克主页刷新,提取Cookie将Cookie填入软件,点击登录,然后双击选择目录,到想要分享的目录点击批量分享,最后…

《重构:改善既有代码的设计(第2版)》PDF、EPUB免费下载

本书是经典著作《重构》出版20年后的更新版。书中清晰揭示了重构的过程,解释了重构的原理和实践方式,并给出了何时以及何地应该开始挖掘代码以求改善。书中给出了60多个可行的重构,每个重构都介绍了一种经过验证的代码变换手法的动机和技术。本书提出的重构准则将帮助开发人…

C# AIModelRouter:使用不同的AI模型完成不同的任务

https://www.cnblogs.com/mingupupu/p/18654982 AIModelRouter AI模型路由,模型的能力有大小之分,有些简单任务,能力小一点的模型也能很好地完成,而有些比较难的或者希望模型做得更好的,则可以选择能力强的模型。为什么要这样做呢?可以降低AI模型的使用成本,毕竟能力强的…

使用Windows批处理命令批量上传jar到Nexus(maven私服)中

这里就不介绍nexus的搭建了;网上很多都是.sh脚本,linux比较合适,当然用git也可以运行;但是.bat文件肯定是所有windows系统都是可以执行的;所以这里介绍一下用windows的批处理命令来编写; 首先,你得先确定的仓库状态是处于Allow redeploy状态;新建一个文件mavenimport.b…

第七届封神台CTF

没事,学习了一下第七届封神台CTF Web welcome_to_zkaqctf ​​ 源码: const {promises: fs} = require(fs); const fastify = require(fastify);const flag = process.env.FLAG || zkaq{do_you_believe_this_is_flag?};const app = fastify(); app.get(/, async (_, res) =&…

Activiti 手工新增历史环节20250116

手工新增(历史任务表)act_hi_taskinst,其实这个表记录加一条就可以 注:字段2值为空,字段值10为空,字段11、12 值都要加-----act_hi_taskinst.PROC_DEF_ID myProcess:34:400000000000736-----act_hi_taskinst.PROC_INST_ID_ 400000000000496SELECT * FROM "act_hi_…

SQL语言做加减运算时将某项的null值转换为0

在SQL语言中,很多时候,在表项中会遇到null值,null值有三大特点:1)NULL值不参加统计;2)NULL值不进入计算表达式;3)不能与其它值进行比较。 因此,在运算中要将null值有时候转换成其他值,这里提供一种加减运算中转换为0的方法。 如:在算工资的时候,总工资=基础工资+奖…

中电金信携手华为发布“全链路实时营销解决方案”,重塑金融营销数智新生态

在数智化转型成为驱动经济社会高质量发展的新引擎背景下,“数智方案”栏目聚焦金融等国计民生重点行业场景,依托中电金信“源启筑基+咨询引领+应用重构”的产品及服务体系,输出市场洞察和行业解决方案、应用案例,旨在全面推动行业IT架构升级、数智化转型。数智驱动是金融机…