一个小例子,给你讲透典型的 Go 并发操作
如果你有一个任务可以分解成多个子任务进行处理,同时每个子任务没有先后执行顺序的限制,等到全部子任务执行完毕后,再进行下一步处理。这时每个子任务的执行可以并发处理,这种情景下适合使用 sync.WaitGroup
。
虽然 sync.WaitGroup
使用起来比较简单,但是一不留神很有可能踩到坑里。
sync.WaitGroup 正确使用
比如,有一个任务需要执行 3 个子任务,那么可以这样写:
func main() {
var wg sync.WaitGroup
wg.Add(3)
go handlerTask1(&wg)
go handlerTask2(&wg)
go handlerTask3(&wg)
wg.Wait()
fmt.Println("全部任务执行完毕.")
}
func handlerTask1(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("执行任务 1")
}
func handlerTask2(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("执行任务 2")
}
func handlerTask3(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("执行任务 3")
}
执行输出:
执行任务 3
执行任务 1
执行任务 2
全部任务执行完毕.
sync.WaitGroup 闭坑指南
01
// 正确
go handlerTask1(&wg)
// 错误
go handlerTask1(wg)
执行子任务时,使用的 sync.WaitGroup
一定要是 wg
的引用类型!
02
注意不要将 wg.Add()
放在 go handlerTask1(&wg)
中!
例如:
// 错误
var wg sync.WaitGroup
go handlerTask1(&wg)
wg.Wait()
...
func handlerTask1(wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
fmt.Println("执行任务 1")
}
注意 wg.Add()
一定要在 wg.Wait()
执行前执行!
小结
注意 wg.Add()
和 wg.Done()
的计数器保持一致!其实 wg.Done()
就是执行的 wg.Add(-1)
。
其实 sync.WaitGroup
使用场景比较局限,仅适用于等待全部子任务执行完毕后,再进行下一步处理。如果需求是当第一个子任务执行失败时,通知其他子任务停止运行,这时 sync.WaitGroup
是无法满足的,需要使用到上下文(context
)。
sync.WaitGroup + Context
在处理并发任务时,若需在任一子任务失败时终止所有其他子任务,以下示例提供了一种实现方法。
package main
import (
"context"
"fmt"
"sync"
"time"
)
// handlerTask 处理单个任务
func handlerTask(ctx context.Context, taskId int, wg *sync.WaitGroup, cancel context.CancelFunc) {
defer wg.Done()
fmt.Printf("Request %d is processing...\n", taskId)
// 模拟请求处理,如果 taskId 为1,则模拟失败
if taskId == 1 {
fmt.Printf("Request %d failed\n", taskId)
cancel() // 取消 context,通知其他请求停止
return
}
// 监听 context.Done() 通道
select {
case <-ctx.Done():
fmt.Printf("Request %d is already cancelled\n", taskId)
return
default:
// 继续执行
time.Sleep(3 * time.Second) // 模拟耗时操作
fmt.Printf("Request %d succeeded\n", taskId)
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(3)
go handlerTask(ctx, 1, &wg, cancel)
go handlerTask(ctx, 2, &wg, cancel)
go handlerTask(ctx, 3, &wg, cancel)
// 等待所有任务完成或被取消
go func() {
wg.Wait()
fmt.Println("All requests are finished or cancelled")
}()
// 给一些时间来处理,防止主 goroutine 过早退出
time.Sleep(5 * time.Second)
}
解释
1.任务 1 开始处理:
- 输出 Request 1 is processing...。
- 检查任务 ID,发现是 1,输出 Request 1 failed。
- 调用 cancel(),取消上下文 ctx。
2.任务 2 和任务 3 开始处理:
- 输出 Request 2 is processing... 和 Request 3 is processing...。
- 由于任务 1 已经调用 cancel(),上下文 ctx 已经被取消。
- 进入 select 语句,检查 ctx.Done() 通道。
- 发现 ctx.Done() 通道已被关闭,输出 Request 2 is already cancelled 和 Request 3 is already cancelled。
3.等待所有任务完成或被取消:
- 等待所有任务完成或被取消。
- 输出 All requests are finished or cancelled。
通过这种方式,可以确保在任务 1 失败时,其他任务能够立即检测到取消信号并停止执行。