Channel 的使用
Channel 声明方法
- chInt := make(chan int) // unbuffered channel 非缓冲通道
- chInt := make(chan int, 0) // unbuffered channel 非缓冲通道
- chInt := make(chan int, 2) // bufferd channel 缓冲通道
Channel 基本用法
- ch <- x // channel 接收数据 x
- x <- ch // channel 发送数据并赋值给 x
- <- ch // channel 发送数据,忽略接受者
如果使用了非缓冲通道,此时向缓冲区塞数据需要有地方能立即接收数据,不然会一致阻塞。原理是此时缓冲区无数据(无缓冲),向 channel 发送数据视为直接发送,即直接发送到正在休眠等待的协程中。
func main() {ch := make(chan string)// 阻塞ch <- "ping"<-ch
}
启动协程来拿数据:
func main() {ch := make(chan string)// 程序能通过go func() {<-ch}()ch<-"ping"
}
内存与通信
不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。主要是为了:
- 避免协程竞争和数据冲突。
- 更高级的抽象,降低开发难度,增加程序可读性。
- 模块之间更容易解耦,增强扩展性和可维护性。
通过共享内存案列:
func watch(p *int) {for {if *p == 1 {fmt.Println("go")break}}
}func main() {i := 0go watch(&i)time.Sleep(time.Second)i = 1time.Sleep(time.Second)
}
通过通信的方式如下:
func watch(c chan int) {if <-c == 1 {fmt.Println("go")}
}func main() {c := make(chan int)go watch(c)time.Sleep(time.Second)c <- 1time.Sleep(time.Second)
}
Channel 的设计
Channel 在 Go 的底层表示为一个 hchan 结构体:
type hchan struct {/* 缓存区结构开始 */qcount uint // total data in the queuedataqsiz uint // size of the circular queuebuf unsafe.Pointer // points to an array of dataqsiz elementselemsize uint16elemtype *_type // element type/* 缓存区结构结束*/// 发送队列sendx uint // send indexsendq waitq // list of send waiters// 接收队列recvx uint // receive indexrecvq waitq // list of recv waiterslock mutex// 0:关闭状态;1:开启状态closed uint32
}type waitq struct {first *sudoglast *sudog
}
数据存放在一个环形缓冲区 Ring Buffer,可以降低内存/GC的开销
关于 c <- “x” 语法糖,channel 数据发送原理
Go 中会把 c<- 编译为 chansend1 方法:
// %GOROOT%src/runtime/chan.go
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}
发送数据的 3 种情形:
- 缓冲区无数据,向 channel 发送数据视为直接发送,将数据直接拷贝给等待接收的协程的接受量,并唤醒该协程;如果无等待中的接收协程,则将数据放入缓冲区。
- 缓冲区有数据但缓冲区未满,则数据存到缓冲区。
- 接收队列中无休眠等待的协程,且缓冲区已满,则将数据包装成 sudog,放入 sendq 队列休眠等待,然受给 channel 解锁。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// ...if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 1. 取出接收等待队列 recvq 的协程,将数据发送给它if sg := c.recvq.dequeue(); sg != nil {send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 接收等待队列中没有协程时if c.qcount < c.dataqsiz {// 2. 缓存空间还有余量,将数据放入缓冲区qp := chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}// ...// 3. 休眠等待gp := getg()// 包装为 sudogmysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// 将数据,协程指针等记录到 mysq 中mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// 将 mysg 自己入队c.sendq.enqueue(mysg)// 休眠gp.parkingOnChan.Store(true)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)KeepAlive(ep)// 被唤醒后再维护一些数据,注意,此时的记录的数据已经被拿走if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nil// ...
}
关于 rec <-c 语法糖,channel 数据接收原理
Go 中会把 <-c 编译为 func chanrecv 方法,具体如下:
- 编译阶段,rec <- c 转化为
runtime.chanrecv1()
- 编译阶段,rec, ok <- c 转化为
runtime.chanrecv2()
- 最终会调用
chanrecv()
方法
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)return
}
接收数据的 4 种情形:
- 有等待中的发送协程(sendq),但缓冲区为空,从协程接收
- 有等待中的发送协程(sendq),但缓冲区非空,从缓存接收
- 接收缓存
- 阻塞接收
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// ...if c.closed != 0 {// ...} else {// 1.,2. 接收数据前,已经有协程在休眠等待发送数据if sg := c.sendq.dequeue(); sg != nil {// 在 1 的情况下,缓存为空,直接从 sendq 的协程取数据// 在 2 的情况下,从缓存(缓冲)取走数据后,将 sendq 里的等待中的协程的数据放入缓存,并唤醒该协程// 这就是为什么 sendq 队列中的协程被唤醒后,其携带的数据已经被取走的原因recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}}// 3. 直接从缓存接收数据if c.qcount > 0 {qp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}// ...// 缓冲区为空,同时 sendq 里也没休眠的协程,则休眠等待gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nilc.recvq.enqueue(mysg)// 休眠gp.parkingOnChan.Store(true)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// ...
}
从 Channel 接收数据的过程中被唤醒,说明之前因为没有数据而休眠等待,当发送方发送数据时,会主动将数据拷贝至接收方本地。