逐步学习Go-并发通道chan(channel)

概述

Go的Routines并发模型是基于CSP,如果你看过七周七并发,那么你应该了解。

什么是CSP?

"Communicating Sequential Processes"(CSP)这个词组的含义来自其英文直译以及在计算机科学中的使用环境。

CSP是 Tony Hoare 在1978年提出的,论文地址在:Communicating sequential processes | Communications of the ACM

拆字解释下

Communicating Sequential Processes(CSP)的三个单词:

  • C for Communicating: 通信,什么的通信那?进程/线程/协程的通信。

  • S for Sequenctial: 顺序的,什么的顺序?进程/线程/协程之间执行任务时应该是有顺序的,完全并行执行是理想化的,现实中就是要先指定完第一个或者第一批任务才能执行第二个或者第二批任务。

  • P for Processses: 进程,这个是进程,估计是因为这个概念提出来的时候比较早。我们这儿得抽象一下,Processes指的是进程/线程/协程。

那么我们来总结一下CSP,CSP就是多个能够进行通信,并且按照顺序执行任务的独立进程。这些进程在各自执行自己的任务的时候,还可以通过某种方式是进行通信。

在Golang中就是通过Channel进行通信。

好了,CSP解释完了,我们来看Go中的Channel,另外CSP的参与者Go Routine我在之前的文章中有提到过,大家可以去:逐步学习Go-协程goroutine

这张图就描述了CSP编程模型。

file

Go中routine代表图中的Process,Channel就是goroutine之间的连接。通道可以让一个goroutine发送信息到另一个goroutine。

Go中的channel

Go中Channel有两种类型:

  1. 无缓冲Channel(Unbuffered)
  2. 有缓冲Channel(Buffered)
    有缓冲的Channel其实就是一个环形缓冲队列;无缓冲的没有队列,因为读写都会阻塞。

Channel的定义

var channel名称 chan channel类型// 类型自动推断
channel名称 := make(chan channel类型, buffer数量(int可以为0))

COPY

比如:我们可以这样来定义:

// 定义了一个channel,还没有make,不确定是否为有缓冲和无缓冲channel
var ch chan int// 定义了一个chnnel, 容量为0,无缓冲channel
ch := make(chan int, 0)// 定义了一个channel,容量为1,有缓冲channel
ch := make(chan int, 1)

COPY

我们实际使用的时候把Channel理解为队列就可以了。

Go中的Channel有两种类型:

  1. 无缓冲channel
  2. 有缓冲channel

无缓冲和有缓冲的特性如下:

  • 无缓冲Channel
    • 无缓冲Channel没有存储数据的能力
    • 发送方向Channel中发送数据的时候,发送方会阻塞直到有接受者接受这个数据
    • 无缓冲Channel典型应用就是go协程同步通信
    • 无缓冲Channel保证通信双方都要准备好数据交换
  • 有缓冲Channel
    • 有缓冲Channel需要定义Channel的容量
    • 发送方向有缓冲Channel发送数据的时候,只有容量满的时候才会阻塞
    • 接收方只有在有缓冲Channel为空时才会阻塞
    • 有缓冲通道的典型应用场景是生产者和消费者

Channel的操作

Channel主要支持2中操作:

  1. 发送(send)
  2. 接收(recv)

这三种操作在代码中的的定义和使用:

  1. 发送和接收都使用<-

来看代码:

// 先定义一个无缓冲channel
ch := make(chan int, 0)
ch := make(chan int)// 发送数据到channel
ch <- 1// 从channel中接收数据
<- ch

COPY

我们看到发送和接收都是使用<-,差别在于:

  1. ch在<-的左边,操作为发送
  2. ch在<-的右边,操作为接收

另外,channel在使用之前都要先创建,使用完毕后要关闭,分别使用makeclose关闭。

// 创建相当于分配channel(allocation)
ch := make(chan int, 0)// 关闭channel,释放channel资源
defer close(ch)

COPY

channel创建完直接关闭了还能操作发送和接收吗?

这个问题我们通过写代码来测试,我们先来测试发送,然后再测试接收。

  • 发送数据到关闭的Channel

    func TestUnbufferedChannel_ShouldPanic_whenWriteValueToAClosedChannel(t *testing.T) {f := func() {ch := make(chan int)close(ch)ch <- 1
    }assert.Panics(t, f, "should panic")
    }
    COPY

    运行截图:

    file

我们的UT PASS了表示发生了panic,这就说明我们不能向已经关闭的channel发送数据。

  • 在已经关闭的Channel上接收

func TestUnbufferedChannel_ShouldSuccess_whenRecvValueAtAClosedChannel(t *testing.T) {ch := make(chan int)close(ch)var val = <-chassert.Equal(t, 0, val)
}func TestUnbufferedChannel_ShouldSuccess_whenRecvEmptyValueAtAClosedChannel(t *testing.T) {ch := make(chan string)close(ch)var val = <-chassert.Equal(t, "", val)
}

COPY

运行截图:

file

这两个UT都可以PASS,我只截图了一个PASS,这说明我们可以在一个关闭的channel上接收数据,只是接收到的都是0值。关于0值要特别说明一下,0值是针对不同类型的,比如:int的0值就是0,string的0值就是空字符串,指针的0值就是nil,看下面代码:

file

并非“任何后续的接收操作都将立即返回零值”,而是当channel中所有已发送的值都被接收后,接下来的接收操作会立即返回零值。

无缓冲channel

无缓冲通道顾名思义:就是没有数据缓冲能力的Channel,有goroutine向无缓冲Channel发送了数据就必须有另一个goroutine来接受,否则发送的goroutine会阻塞;反之,有goroutine从这个channel接受数据而没有另一个goroutine向这个channel发送,那么接受的goroutine也会阻塞。

应用场景:

  1. 部分任务需要同步就用无缓冲channel

来看场景代码:

有发送无接受

发送goroutine会被阻塞。


func TestUnbufferedChannel_ShouldWriteTimeout_WhenNoRoutineReadTheChannel(t *testing.T) {// 创建无缓冲channelc := make(chan int)is_timeout := falsetry_to_write_value := 1// whenselect {// 直接向channel中发送case c <- try_to_write_value:case <-time.After(3 * time.Second):// shouldis_timeout = true}assert.True(t, is_timeout)}

COPY

file

有接受无发送

接收goroutine会被阻塞


func TestUnbufferedChannel_ShouldReadTimeout_WhenNoValueWriteToChannel(t *testing.T) {// 创建无缓冲channelc := make(chan int)is_timeout := falseselect {// 直接接受channel中的数据case <-c:case <-time.After(3 * time.Second):// shouldis_timeout = true}// 三秒后超时assert.True(t, is_timeout)}

COPY

有发送有接受

有发送有接收,一切正常。

func sum(s []int, c chan int) {sum := 0for _, v := range s {sum += v}// 将累加结果发送到channelc <- sum
}func TestUnbufferedChannel_ShouldRecvValues_WhenWriteValueToChannel(t *testing.T) {// 创建无缓冲channelc := make(chan int)// givens := []int{1, 2, 3, 4, 5, 6}// when// 执行数组累加go sum(s[:], c)ret1 := <-c// should// 和应该是21assert.Equal(t, 21, ret1)
}

COPY

file

使用无缓冲Channel控制并发

// 先定义一个worker函数
// worker函数从无缓冲channel中接收
// 可以接到到数据就执行后面的打印内容
// 打印完成后退出
func worker(id int, lock chan bool) {var shouldRun = <-lockif shouldRun {fmt.Printf("time: %v Worker %d is working\n", time.Now(), id)time.Sleep(time.Second)fmt.Printf("time: %v Worker %d has finished\n", time.Now(), id)}
}func TestUnbufferedChannel_ShouldRunOneByOne_When(t *testing.T) {lock := make(chan bool, 1)// 启动5个goroutine等待释放接收for i := 0; i < 5; i++ {go worker(i, lock)}// 发送5个true到channelfor i := 0; i < 5; i++ {lock <- truetime.Sleep(time.Second)}close(lock)time.Sleep(10 * time.Second)
}

COPY

file

使用无缓冲Channel实现CompleteFuture.anyOf()

CompleteFuture.anyOf() 是 Java 中的一个函数,它返回一个新的 CompletableFuture,当给定的任何 CompletableFuture 完成时,返回的 CompletableFuture 也完成,并带有完成的 CompletableFuture 的结果。


// future函数使用time.Sleep模拟实际业务处理延迟
// 业务处理完成后将业务数据写入无缓冲Channel
func future(id int, delay time.Duration, resChan chan int) {time.Sleep(delay)fmt.Printf("Hi, I have finished my task, my id is %d\n", id)resChan <- id
}// 接收一系列上面的future, 然后使用go routine启动这些future函数并将结果写入到result channel,最后再返回result channel。
func anyOf(futures ...<-chan int) <-chan int {result := make(chan int)for _, future := range futures {go func(f <-chan int) {result <- <-f}(future)}return result
}func TestAnyOf_ShouldSuccess(t *testing.T) {// 创建无缓冲的 channelresChan1 := make(chan int)resChan2 := make(chan int)resChan3 := make(chan int)// 启动 goroutinesgo future(1, 3*time.Second, resChan1)go future(2, 2*time.Second, resChan2)go future(3, 5*time.Second, resChan3)result := anyOf(resChan1, resChan2, resChan3)assert.Equal(t, 2, <-result)
}

COPY

上面有两个比较让人纠结的语法:

  1. <-chan int
  2. result <- <-f
  • <-chan int表示只读通道,anyOf只能读取通道内的数据;有了只读就有只写,只写通道chan<- int
  • result <- <-f表示从通道f中接收数据并将数据写入到result通道。这一行相当于执行了
    v := <-f
    result <- v
    COPY

    file

有缓冲channel

有缓冲channel就是你可以暂时把数据发送到channel,如果channel的缓冲区没有被占用完就不会阻塞,缓冲区被占用完了就被阻塞了。

特性:

  1. 发送goroutine在缓冲区没有用完之前不会阻塞,缓冲区被使用完了之后发送goroutine就会被阻塞
  2. 接受goroutine在缓冲区有数据时,不会阻塞,缓冲区没有数据时会被阻塞

有缓冲channel应用场景是什么?

  1. 任务队列就是最典型的场景,生产者消费者模型
  2. 其他无缓冲channel搞不定的就用有缓冲channel

实现一个有缓冲channel的RateLimiter

import ("sync""sync/atomic""testing""fmt""time""github.com/stretchr/testify/assert"
)type RateLimiter struct {tokens       chan struct{}refillTicker *time.TickercloseCh      chan struct{}
}func NewRateLimiter(rate int) *RateLimiter {r := &RateLimiter{tokens:       make(chan struct{}, rate),refillTicker: time.NewTicker(time.Second / time.Duration(rate)),closeCh:      make(chan struct{}),}go r.refill()return r
}func (r *RateLimiter) refill() {for {select {case <-r.refillTicker.C:select {case r.tokens <- struct{}{}:default:}case <-r.closeCh:r.refillTicker.Stop()return}}
}func (r *RateLimiter) Acquire() {<-r.tokens
}func (r *RateLimiter) TryAcquire() bool {select {case <-r.tokens:return truedefault:return false}
}func (r *RateLimiter) Close() {close(r.closeCh)
}func myTask(id int) {fmt.Printf("time: %v workder %d is working\n", time.Now(), id)time.Sleep(20 * time.Millisecond)fmt.Printf("time: %v workder %d has finished\n", time.Now(), id)
}func TestRateLimiter_ShouldPermitWithBlocking_WhenRequestOnce(t *testing.T) {rateLimiter := NewRateLimiter(100)startTime := time.Now()for i := 0; i < 1; i++ {rateLimiter.TryAcquire()myTask(i)}endTime := time.Now()elapsedTime := endTime.Sub(startTime)fmt.Printf("elapsed time: %v\n", elapsedTime)fmt.Printf("explect time: %v\n", 300*time.Millisecond)assert.True(t, elapsedTime < 300*time.Millisecond)
}func TestRateLimiter_ShouldLimitPermits_WhenGivenLimitedResource(t *testing.T) {var counter int32 = 0rateLimiter := NewRateLimiter(100)wg := sync.WaitGroup{}startTime := time.Now()for i := range 1000 {wg.Add(1)go func() {rateLimiter.Acquire()myTask(i)atomic.AddInt32(&counter, 1)wg.Done()}()}wg.Wait()endTime := time.Now()elapsedTime := endTime.Sub(startTime)fmt.Printf("elapsed time: %v\n", elapsedTime)fmt.Printf("should greater than explect time: %v\n", 10*time.Second)assert.Equal(t, counter, int32(1000))assert.True(t, 10*time.Second < elapsedTime)
}

COPY

file

实现无缓冲Channel实现Java中的CyclicBarrier

CyclicBarrier 是一个同步工具,它允许一组线程互相等待,直到他们都到达了一个共同的屏障点。在涉及固定大小的线程团队必须偶尔相互等待的程序中,CyclicBarriers 非常有用。之所以称之为“循环”屏障,是因为在等待的线程被释放之后,它可以被重复使用。

  • await() 所有的参与者都调用了wait方法后返回或者被中断
    我们就实现这个await方法,暂时不支持中断,代码如下:

package mainimport ("fmt""sync""sync/atomic""time"
)// CyclicBarrier 让一组goroutine在到达某个点之后才能继续执行
type CyclicBarrier struct {// 总goroutine数量participant int// 用于等待所有goroutine准备好waitGroup sync.WaitGroup// 无缓冲channel,用于goroutine间同步barrierChan chan struct{}running     int32
}// NewCyclicBarrier 创建一个新的CyclicBarrier
func NewCyclicBarrier(participant int) *CyclicBarrier {b := &CyclicBarrier{participant: participant,barrierChan: make(chan struct{}),running:     int32(participant),}// 设置等待的goroutine数b.waitGroup.Add(participant)return b
}// 当一个goroutine调用Wait时,
// 它将在屏障处等待,
// 直到所有goroutine都到达这里
func (b *CyclicBarrier) Wait() {// 一个goroutine准备好了b.waitGroup.Done()// 等待所有goroutine都准备好b.waitGroup.Wait()// 当所有goroutine都准备好了,关闭channel进行广播通知if atomic.AddInt32(&b.running, -1) == 0 {close(b.barrierChan)} else {// 等待通知<-b.barrierChan}}// 阻塞调用goroutine直到所有goroutine都调用了Wait方法,
// 屏障开放后,重新置为待关闭状态
func (b *CyclicBarrier) Await() {// 等待屏障开放的信号<-b.barrierChan// 重置屏障状态b.barrierChan = make(chan struct{})b.waitGroup.Add(b.participant)
}func (b *CyclicBarrier) Close() {close(b.barrierChan)
}func main() {// 这里我们设置3个goroutine参与barrier := NewCyclicBarrier(100)for i := 0; i < 100; i++ {go func(i int) {fmt.Printf("Goroutine %d is working...\n", i)// 模拟工作time.Sleep(time.Duration(i+1) * time.Second)fmt.Printf("Goroutine %d reached the barrier.\n", i)barrier.Wait()fmt.Printf("Goroutine %d passed the barrier.\n", i)}(i)}// 主goroutine等待所有goroutine都到达屏障barrier.Await()fmt.Println("All goroutines have passed the barrier")
}

COPY

参考

  1. go/src/runtime/chan.go at master · golang/go · GitHub
  2. 逐步学习Go-并发通道chan(channel) – FOF编程网

编写不易,如有问题请评论告知

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/571953.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

今日AI热点:科技前沿新动态

引言&#xff1a; 人工智能领域日新月异&#xff0c;每天都有令人振奋的新进展。从苹果到谷歌&#xff0c;从OpenAI到Meta&#xff0c;各大科技巨头纷纷推出创新产品和技术&#xff0c;不断推动着人工智能的发展。让我们一起来看看今日AI热点&#xff0c;探索这个充满活力和激情…

软件测试技术之登录页面测试用例的设计方法

相信大家都有过写登录测试用例的经验&#xff0c;相较于开发人员编写代码而言&#xff0c;测试人员编写用例同样重要。本文作者总结了一些关于登录用例的经验。 一、功能测试用例设计&#xff1a; 1、正常登录场景 测试用例1&#xff1a;输入正确的用户名和密码&#xff0c;验证…

rmvb怎么转换为mp4?最简单方法!

各种文件格式层出不穷&#xff0c;而RMVB&#xff08;RealMedia Variable Bitrate&#xff09;格式作为一种独特的视频文件格式&#xff0c;其起源可以追溯到上世纪90年代。当时&#xff0c;随着数字视频的崛起&#xff0c;RealNetworks公司迎来了一项重要任务&#xff1a;提供…

20240320-1-梯度下降

梯度下降法面试题 1. 机器学习中为什么需要梯度下降 梯度下降的作用&#xff1a; 梯度下降是迭代法的一种&#xff0c;可以用于求解最小二乘问题。在求解损失函数的最小值时&#xff0c;可以通过梯度下降法来一步步的迭代求解&#xff0c;得到最小化的损失函数和模型参数值。…

如何应对Android面试官->进程通信如何注册与获取服务

前言 大家好&#xff0c;我是老A&#xff1b; 这个章节继续上一章节继续讲解&#xff0c;主要讲解下 java 层服务的注册与获取、线程池&#xff1b;我们基于 AMS 来看下 java 层是如何获取的&#xff1b; SystemServer SystemServer 的启动也是 main 函数&#xff0c;我们进入…

【王道训练营】第3题 判断某个年份是不是闰年,如果是闰年,请输出“yes”,否则请输出“no”

文章目录 引言闰年初始代码代码改进改进1&#xff1a;添加提示信息改进2&#xff1a;代码格式改进3&#xff1a;变量命名 其他实现方式使用if-else语句使用函数使用三元操作符 结论 引言 在公历中&#xff0c;闰年的规则如下&#xff1a;如果某个年份能被4整除但不能被100整除…

拥抱C++的深度和复杂性,挖掘更多可能 !——《C++20高级编程(第5版)》

&#xff0c;C难以掌握&#xff0c;但其广泛的功能使其成为游戏和商业软件应用程序中最常用的语言。即使是有经验的用户通常也不熟悉许多高级特性&#xff0c;但C20的发布提供了探索该语言全部功能的绝佳机会。《C20高级编程(第5版)》为C的必要内容提供了一个代码密集型、面向解…

docker centos7在线安装Nginx

目录 1.在线安装Nginx2.配置开机启动 1.在线安装Nginx # 安装Nginx yum install epel-release yum install nginx2.配置开机启动 # 启动Nginx systemctl start nginx # 开机自启 systemctl enable nginx一般docker内的centos7安装Nginx的目录结构是&#xff1a; /etc/nginx为…

系统工程学思想

系统工程学思想 大项目或复杂问题的实施和解决&#xff0c;需要按照系统工程学理论进行&#xff0c;以系统的方法完整、全面的分析&#xff0c;而不是零星的处理问题&#xff0c;沿着逻辑推理的路径&#xff0c;去解决哪些原本靠直觉判断处理的问题。 系统分析过程逻辑结构分为…

linux如何查看编译器支持的C++版本(支持C++11、支持C++14、支持C++17、支持C++20)(编译时不指定g++版本,默认使用老版本编译)

参考:https://blog.csdn.net/Dontla/article/details/129016157 C各个版本 C11 C11是一个重要的C标准版本&#xff0c;于2011年发布。C11带来了许多重要的改进&#xff0c;包括&#xff1a; 智能指针&#xff1a;引入了shared_ptr和unique_ptr等智能指针&#xff0c;用于更好地…

Qt篇——Qt无法翻译tr()里面的字符串

最近遇到使用Qt语言家翻译功能时&#xff0c;ui界面中的中文都能够翻译成英文&#xff0c;但是tr("测试")这种动态设置给控件的中文&#xff0c;无法翻译&#xff08;lang_English.ts文件中的翻译已经正确添加了tr()字符串的翻译&#xff09;。 上网搜了很多资料&am…

C语言例:设 int x; 则表达式 (x=4*5,x*5),x+25 的值

代码如下&#xff1a; #include<stdio.h> int main(void) {int x,m;m ((x4*5,x*5),x25);printf("(x4*5,x*5),x25 %d\n",m);//x4*520//x*5100//x2545return 0; } 结果如下&#xff1a;