本文阅读代码链接:https://github.com/kubernetes/client-go/tree/release-1.30
下面代码全部在 util/workqueue 文件中:
一. workqueue基本概念
在 client-go 的 util/workqueue 包中,主要有三个队列:普通队列、延时队列和限速队列。每个队列都是在前一个队列的基础上实现的,逐层添加新功能。按照 Queue -> DelayingQueue -> RateLimitingQueue 的顺序来逐层分析限速队列的实现。
普通队列 ( Queue ) :
- 普通队列的实现主要是通过一个简单的队列结构来管理待处理的任务。
- 它提供了基本的队列操作,如添加任务 ( Add )、获取任务 ( Get )、完成任务 ( Done ) 等。
- 通过
sync.Cond
来实现队列的同步操作,确保线程安全。
延时队列 ( DelayingQueue ) :
- 延时队列在普通队列的基础上增加了延时功能。
- 它允许任务在指定的时间后再加入队列进行处理。
- 使用
heap
实现优先级队列,确保任务按照时间顺序进行处理。 - 通过
clock
来管理时间,支持真实时间和模拟时间。
限速队列 ( RateLimitingQueue ) :
- 限速队列在延时队列的基础上增加了限速功能。
- 使用
RateLimiter
来控制任务的处理速度,避免任务过快地重新加入队列。 - 提供了
AddRateLimited
方法,根据RateLimiter
的策略来决定任务何时可以重新加入队列。 - 通过
Forget
方法来清除任务的重试记录。 - 通过这种层层递进的设计,
RateLimitingQueue
能够在普通队列和延时队列的基础上,提供更复杂的任务处理能力,适用于需要限速的场景。
二. workqueue主要作用
workqueue 主要用于 Kubernetes 控制器中处理事件和任务的队列系统,它具有以下主要作用:
- 任务管理 :提供了一种机制来管理需要处理的任务,确保任务按照一定的顺序被处理。
- 并发控制 :通过队列机制控制并发处理的任务数量,避免系统过载。
- 延迟处理 :支持将任务延迟一段时间后再处理,适用于需要重试的场景。
- 限速处理 :控制任务重新入队的频率,避免因频繁重试导致的系统压力。
- 错误处理 :提供了一种机制来处理任务执行失败的情况,支持重试策略。
三. 普通队列 - Queue
3.1 接口和结构体
普通队列定义了基本的队列接口和实现:
// Interface 定义了队列的基本操作
type Interface interface {Add(item interface{})Len() intGet() (item interface{}, shutdown bool)Done(item interface{})ShutDown()ShutDownWithDrain()ShuttingDown() bool
}// Type 是工作队列的具体实现
type Type struct {// queue 定义了我们处理项目的顺序queue []t// dirty 定义了所有需要处理的项目dirty set// processing 包含当前正在处理的项目// 这些项目可能同时在 dirty 集合中processing setcond *sync.CondshuttingDown booldrain boolmetrics queueMetricsunfinishedWorkUpdatePeriod time.Durationclock clock.WithTicker
}
3.2 核心方法:
Add 方法
Add
方法用于将项目标记为需要处理
工作流程:
- 首先检查队列是否正在关闭,如果是则直接返回。
- 检查项目是否已经在 dirty 集合中,如果是则直接返回。
- 将项目添加到 dirty 集合中。
- 如果项目不在 processing 集合中,则将其添加到队列中。
- 通过
cond.Signal()
唤醒一个等待的 goroutine 来处理新添加的项目。
func (q *Type) Add(item interface{}) {// 获取锁,确保线程安全q.cond.L.Lock()defer q.cond.L.Unlock()// 检查队列是否正在关闭,如果是则直接返回if q.shuttingDown {return}// 检查项目是否已经在 dirty 集合中,如果是则直接返回// 这避免了重复添加相同的项目if q.dirty.has(item) {return}// 更新指标,记录添加操作q.metrics.add(item)// 将项目添加到 dirty 集合中,标记为需要处理q.dirty.insert(item)// 如果项目已经在 processing 集合中(正在被处理),则不添加到队列// 当处理完成时,Done 方法会检查 dirty 集合并重新添加到队列if q.processing.has(item) {return}// 将项目添加到队列末尾q.queue = append(q.queue, item)// 通知一个等待的 goroutine 有新项目可处理q.cond.Signal()
}
Get方法
Get
方法用于从队列中获取待处理的项目
工作流程:
- 如果队列为空且未关闭,则阻塞等待。
- 如果队列为空且正在关闭,则返回 shutdown = true。
- 从队列头部取出一个项目。
- 将项目从 dirty 集合中删除,并添加到 processing 集合中。
- 返回项目和 shutdown = false。
// Get 阻塞直到可以返回一个要处理的项目
// 如果 shutdown = true,调用者应该结束其 goroutine
// 处理完项目后必须调用 Done
func (q *Type) Get() (item interface{}, shutdown bool) {// 获取锁,确保线程安全q.cond.L.Lock()defer q.cond.L.Unlock()// 如果队列为空且未关闭,则阻塞等待// 这里使用 for 循环而不是 if 语句,是为了防止虚假唤醒for len(q.queue) == 0 && !q.shuttingDown {q.cond.Wait()}// 如果队列为空,说明队列正在关闭// 返回 nil 和 true,通知调用者结束其 goroutineif len(q.queue) == 0 {return nil, true}// 从队列头部取出一个项目item = q.queue[0]// 将队列头部元素设为 nil,避免内存泄漏// 注释解释了底层数组仍然存在并引用此对象,所以需要显式设为 nilq.queue[0] = nil// 更新队列,移除头部元素q.queue = q.queue[1:]// 更新指标,记录获取操作q.metrics.get(item)// 将项目添加到 processing 集合中,标记为正在处理q.processing.insert(item)// 将项目从 dirty 集合中删除,因为它已经被取出处理q.dirty.delete(item)// 返回项目和 false,表示队列未关闭return item, false
}
Done方法
Done
方法用于标记项目处理完成
工作流程:
- 将项目从 processing 集合中删除
- 如果项目在 dirty 集合中,则将其重新添加到队列中
- 如果 processing 集合为空,则通知等待的 goroutine
// Done 标记项目处理完成
// 如果在处理过程中项目被再次标记为 dirty,则会重新添加到队列中进行重新处理
func (q *Type) Done(item interface{}) {// 获取锁,确保线程安全q.cond.L.Lock()defer q.cond.L.Unlock()// 更新指标,记录完成操作q.metrics.done(item)// 将项目从 processing 集合中删除,表示处理完成q.processing.delete(item)// 如果项目在 dirty 集合中(说明在处理过程中被再次标记为需要处理)// 则将其重新添加到队列中,以便再次处理if q.dirty.has(item) {q.queue = append(q.queue, item)q.cond.Signal() // 通知一个等待的 goroutine 有新项目可处理} else if q.processing.len() == 0 {// 如果 processing 集合为空(没有正在处理的项目)// 也发送信号,用于唤醒等待队列排空的 goroutineq.cond.Signal()}
}
四. 延时队列 - DelayingQueue
4.1 接口和结构体
延时队列在普通队列的基础上增加了延时功能:
// DelayingInterface 是一个可以在稍后时间添加项目的接口
// 这使得在失败后重新入队项目更容易,而不会陷入热循环
type DelayingInterface interface {Interface// AddAfter 在指定的持续时间过后将项目添加到工作队列AddAfter(item interface{}, duration time.Duration)
}// delayingType 包装了一个 Interface 并提供延迟重新入队功能
type delayingType struct {Interface// clock 跟踪延迟触发的时间clock clock.Clock// stopCh 让我们向等待循环发送关闭信号stopCh chan struct{}// stopOnce 保证我们只发送一次关闭信号stopOnce sync.Once// heartbeat 确保我们等待不超过 maxWait 就触发heartbeat clock.Ticker// waitingForAddCh 是一个缓冲通道,用于提供 waitingForAddwaitingForAddCh chan *waitFor// metrics 计算重试次数metrics retryMetrics
}// waitFor 保存要添加的数据和应该添加的时间
type waitFor struct {data treadyAt time.Time// 在优先级队列中的索引index int
}
延时队列使用优先级队列来管理延时任务:
// waitForPriorityQueue 实现了 waitFor 项目的优先级队列
//
// waitForPriorityQueue 实现了 heap.Interface。时间上最早发生的项目
//(即具有最小 readyAt 的项目)位于根部(索引 0)。
// Peek 返回索引 0 处的最小项目。Pop 返回最小项目,
// 该项目已从队列中移除并由 container/heap 放置在索引 Len()-1 处。
// Push 在索引 Len() 处添加一个项目,container/heap 将其渗透到正确的位置。
type waitForPriorityQueue []*waitForfunc (pq waitForPriorityQueue) Len() int {return len(pq)
}
func (pq waitForPriorityQueue) Less(i, j int) bool {return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {pq[i], pq[j] = pq[j], pq[i]pq[i].index = ipq[j].index = j
}// Push 向队列添加一个项目。不应直接调用 Push;
// 而是使用 `heap.Push`。
func (pq *waitForPriorityQueue) Push(x interface{}) {n := len(*pq)item := x.(*waitFor)item.index = n*pq = append(*pq, item)
}// Pop 从队列中移除一个项目。不应直接调用 Pop;
// 而是使用 `heap.Pop`。
func (pq *waitForPriorityQueue) Pop() interface{} {n := len(*pq)item := (*pq)[n-1]item.index = -1*pq = (*pq)[0:(n - 1)]return item
}// Peek 返回队列开头的项目,而不移除项目或以其他方式改变队列。
// 可以安全地直接调用。
func (pq waitForPriorityQueue) Peek() interface{} {return pq[0]
}
4.2 核心方法:
AddAfter 方法
AddAfter
方法它允许在指定的延迟后将项目添加到队列中
工作流程:
- 首先检查队列是否正在关闭,如果是则直接返回
- 更新重试指标
- 如果延迟时间小于等于 0,则立即将项目添加到队列中
- 否则,创建一个
waitFor
结构体,包含项目数据和准备时间,并将其发送到 waitingForAddCh 通道
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {// 首先检查队列是否正在关闭,如果是则直接返回if q.ShuttingDown() {return}// 更新重试指标,记录重试操作q.metrics.retry()// 如果延迟时间小于等于 0,则立即将项目添加到队列中if duration <= 0 {q.Add(item)return}// 使用 select 语句处理通道操作select {case <-q.stopCh:// 如果队列已关闭(stopCh 被关闭),则解除阻塞并返回case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:// 创建一个 waitFor 结构体,包含项目数据和准备时间// 将其发送到 waitingForAddCh 通道,由 waitingLoop 协程处理}
}
waitingLoop 方法
waitingLoop
方法是延时队列的核心循环,它负责管理延时任务
工作流程:
初始化一个优先级队列 waitingForQueue
和一个映射 waitingEntryByData
无限循环中:
- 检查队列是否正在关闭,如果是则返回
- 获取当前时间
- 检查优先级队列中是否有准备好的项目,如果有则将其添加到队列中
- 为优先级队列中的第一个项目设置定时器
- 使用
select
语句等待以下事件之一:- 队列关闭信号
- 心跳信号(确保不会等待太长时间)
- 下一个项目准备好的信号
- 新的延时任务到达
- 处理新的延时任务:如果已经准备好则立即添加到队列中,否则插入到优先级队列中
- 尝试排空通道,处理所有待处理的延时任务
// waitingLoop 运行直到工作队列关闭,并保持检查要添加的项目列表
func (q *delayingType) waitingLoop() {// 注册 panic 处理函数,确保即使发生 panic 也能恢复defer utilruntime.HandleCrash()// 创建一个永不触发的通道,用于 select 语句中的占位符never := make(<-chan time.Time)// 声明一个定时器变量,用于等待下一个项目准备好的时间var nextReadyAtTimer clock.Timer// 初始化一个优先级队列,用于存储延时项目waitingForQueue := &waitForPriorityQueue{}// 使用 container/heap 包实现的优先级队列,确保最早准备好的项目在队列头部。heap.Init(waitingForQueue)// 创建一个映射,用于快速查找已存在的项目waitingEntryByData := map[t]*waitFor{}// 无限循环,直到队列关闭for {// 检查队列是否正在关闭,如果是则返回if q.Interface.ShuttingDown() {return}// 获取当前时间now := q.clock.Now()// 处理所有已准备好的项目for waitingForQueue.Len() > 0 {// 查看队列头部的项目entry := waitingForQueue.Peek().(*waitFor)// 如果项目还没准备好,则跳出循环if entry.readyAt.After(now) {break}// 从优先级队列中弹出项目entry = heap.Pop(waitingForQueue).(*waitFor)// 将项目添加到工作队列中q.Add(entry.data)// 从映射中删除项目delete(waitingEntryByData, entry.data)}// 设置下一个项目的定时器nextReadyAt := neverif waitingForQueue.Len() > 0 {// 如果已有定时器,先停止它if nextReadyAtTimer != nil {nextReadyAtTimer.Stop()}// 获取队列头部的项目entry := waitingForQueue.Peek().(*waitFor)// 创建一个新的定时器,等待到项目的准备时间nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))// 获取定时器的通道nextReadyAt = nextReadyAtTimer.C()}// 使用 select 语句等待多个事件select {case <-q.stopCh:// 如果收到停止信号,则返回returncase <-q.heartbeat.C():// 如果收到心跳信号,继续循环// 心跳确保我们不会等待太长时间case <-nextReadyAt:// 如果下一个项目准备好了,继续循环case waitEntry := <-q.waitingForAddCh:// 如果收到新的延时项目if waitEntry.readyAt.After(q.clock.Now()) {// 如果项目还没准备好,插入到优先级队列中insert(waitingForQueue, waitingEntryByData, waitEntry)} else {// 如果项目已经准备好,直接添加到工作队列中q.Add(waitEntry.data)}// 尝试排空通道,处理所有待处理的延时任务drained := falsefor !drained {select {case waitEntry := <-q.waitingForAddCh:// 处理更多的延时项目if waitEntry.readyAt.After(q.clock.Now()) {insert(waitingForQueue, waitingEntryByData, waitEntry)} else {q.Add(waitEntry.data)}default:// 如果通道为空,则标记为已排空drained = true}}}}
}
insert 方法
insert
函数是延时队列中的辅助函数,用于将延时项目添加到优先级队列中,或者更新已存在项目的准备时间
工作流程:
- 检查项目是否已经存在于映射中
- 如果存在,则仅在新的准备时间早于现有准备时间的情况下更新时间
- 如果不存在,则将项目添加到优先级队列和映射中
// insert 将条目添加到优先级队列,或者如果它已经存在于队列中,则更新 readyAt
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {// 检查项目是否已经存在于映射中existing, exists := knownEntries[entry.data]// 如果项目已存在if exists {// 仅在新的准备时间早于现有准备时间的情况下更新时间// 这确保了项目总是以最早的准备时间被处理if existing.readyAt.After(entry.readyAt) {// 更新准备时间existing.readyAt = entry.readyAt// 修复堆,确保优先级队列的顺序正确heap.Fix(q, existing.index)}// 如果新的准备时间不早于现有准备时间,则不做任何更改return}// 如果项目不存在,则将其添加到优先级队列中heap.Push(q, entry)// 同时将项目添加到映射中,以便快速查找knownEntries[entry.data] = entry
}
五. 限速队列 - RateLimitingQueue
5.1 接口和结构体
限速队列在延时队列的基础上增加了限速功能,它使用 RateLimiter
来控制任务的处理速度,避免任务过快地重新加入队列。
// RateLimitingInterface 是一个限制项目添加到队列速率的接口
type RateLimitingInterface interface {DelayingInterface// AddRateLimited 在速率限制器允许的情况下将项目添加到工作队列AddRateLimited(item interface{})// Forget 表示项目已完成重试// 无论是永久失败还是成功,我们都将停止速率限制器跟踪它// 这只清除 `rateLimiter`,你仍然需要在队列上调用 `Done`Forget(item interface{})// NumRequeues 返回项目被重新入队的次数NumRequeues(item interface{}) int
}// rateLimitingType 包装了一个 RateLimiter Interface 并提供限速重新入队功能
type rateLimitingType struct {DelayingInterfacerateLimiter RateLimiter
}
RateLimiter
接口定义了限速器的基本操作:
// RateLimiter 控制项目重新入队的速率
type RateLimiter interface {// When 获取一个项目并返回它应该在多长时间后重新入队When(item interface{}) time.Duration// NumRequeues 返回项目被重新入队的次数NumRequeues(item interface{}) int// Forget 表示项目已完成重试,不再需要跟踪Forget(item interface{})
}
5.2 核心方法:
AddRateLimited 方法
AddRateLimited
方法是限速队列的核心方法,它根据限速器的策略来决定任务何时可以重新加入队列
工作流程:
- 调用
q.rateLimiter.When(item)
获取项目应该在多长时间后重新入队 - 调用
q.DelayingInterface.AddAfter
方法,将项目延迟指定时间后添加到队列中
// AddRateLimited 根据速率限制器允许的时间将项目添加到队列
func (q *rateLimitingType) AddRateLimited(item interface{}) {q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
Forget 方法
Forget
方法用于清除任务的重试记录
工作流程:
- 调用 rateLimiter.Forget(item) 清除项目的重试记录
// Forget 表示项目已完成重试
func (q *rateLimitingType) Forget(item interface{}) {q.rateLimiter.Forget(item)
}
NumRequeues 方法
NumRequeues
方法用于获取任务被重新入队的次数
工作流程:
- 调用 rateLimiter.NumRequeues(item) 获取项目被重新入队的次数
// NumRequeues 返回项目被重新入队的次数
func (q *rateLimitingType) NumRequeues(item interface{}) int {return q.rateLimiter.NumRequeues(item)
}
5.3 限速器实现
Kubernetes 提供了多种限速器实现,以适应不同的场景
限速器在util/workqueue/default_rate_limiters.go
中
ItemExponentialFailureRateLimiter
指数退避限速器,随着重试次数的增加,延迟时间呈指数增长
- failuresLock :用于保护 failures 映射的互斥锁,确保线程安全。
- failures :记录每个项目的重试次数。
- baseDelay :基础延迟时间。
- maxDelay :最大延迟时间,防止延迟时间过长。
// ItemExponentialFailureRateLimiter 对每个项目进行指数退避
type ItemExponentialFailureRateLimiter struct {failuresLock sync.Mutexfailures map[interface{}]intbaseDelaySec intmaxDelaySec int
}// When 获取一个项目并返回它应该在多长时间后重新入队
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {r.failuresLock.Lock()defer r.failuresLock.Unlock()// 获取项目的重试次数exp := r.failures[item]// 增加项目的重试次数r.failures[item] = r.failures[item] + 1// 计算指数退避延迟时间backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))// 防止延迟时间溢出if backoff > math.MaxInt64 {return r.maxDelay}// 将计算的延迟时间转换为 Duration 类型calculated := time.Duration(backoff)// 如果计算的延迟时间超过最大延迟时间,则返回最大延迟时间if calculated > r.maxDelay {return r.maxDelay}// 返回计算的延迟时间return calculated
}
ItemFastSlowRateLimiter
快慢限速器,前几次重试使用较短的延迟,之后使用较长的延迟
- failuresLock :用于保护 failures 映射的互斥锁,确保线程安全。
- failures :记录每个项目的重试次数。
- maxFastAttempts :最大快速重试次数。
- fastDelay :快速重试的延迟时间。
- slowDelay :慢速重试的延迟时间。
// ItemFastSlowRateLimiter 对前 N 次失败使用快速重试,之后使用慢速重试
type ItemFastSlowRateLimiter struct {failuresLock sync.Mutexfailures map[interface{}]intfastDelay time.DurationslowDelay time.Durationthreshold int
}// When 获取一个项目并返回它应该在多长时间后重新入队
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {r.failuresLock.Lock()defer r.failuresLock.Unlock()// 增加项目的重试次数r.failures[item] = r.failures[item] + 1// 如果重试次数小于或等于最大快速重试次数,返回快速重试延迟if r.failures[item] <= r.maxFastAttempts {return r.fastDelay}// 否则,返回慢速重试延迟return r.slowDelay
}
MaxOfRateLimiter
组合多个限速器,使用其中延迟最长的一个
- limiters :存储多个 RateLimiter 实例的切片。
// MaxOfRateLimiter 使用多个限速器中延迟最长的一个
type MaxOfRateLimiter struct {limiters []RateLimiter
}// When 获取一个项目并返回它应该在多长时间后重新入队
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {ret := time.Duration(0)for _, limiter := range r.limiters {// 调用每个限速器的 When 方法curr := limiter.When(item)// 记录最长的延迟时间if curr > ret {ret = curr}}// 返回最长的延迟时间return ret
}
六. 使用示例
6.1 Queue
// 创建一个普通队列
queue := workqueue.New()// 添加一个项目
queue.Add("task1")// 获取一个项目
item, shutdown := queue.Get()
if !shutdown {// 处理项目fmt.Println("处理项目:", item)// 标记项目处理完成queue.Done(item)
}// 关闭队列
queue.ShutDown()
6.2 DelayingQueue
// 创建一个延时队列
delayingQueue := workqueue.NewDelayingQueue()// 添加一个延时项目,5秒后处理
delayingQueue.AddAfter("task2", 5*time.Second)// 获取一个项目
item, shutdown := delayingQueue.Get()
if !shutdown {// 处理项目fmt.Println("处理项目:", item)// 标记项目处理完成delayingQueue.Done(item)
}// 关闭队列
delayingQueue.ShutDown()
6.3 RateLimitingQueue
// 创建一个指数退避限速器
rateLimiter := workqueue.DefaultControllerRateLimiter()// 创建一个限速队列
rateLimitingQueue := workqueue.NewRateLimitingQueue(rateLimiter)// 添加一个限速项目
rateLimitingQueue.AddRateLimited("task3")// 获取一个项目
item, shutdown := rateLimitingQueue.Get()
if !shutdown {// 处理项目fmt.Println("处理项目:", item)// 如果处理成功,忘记项目的重试历史rateLimitingQueue.Forget(item)// 标记项目处理完成rateLimitingQueue.Done(item)
}// 如果处理失败,重新添加到队列
if processFailed {rateLimitingQueue.AddRateLimited("task3")
}// 关闭队列
rateLimitingQueue.ShutDown()
6.4 实际场景
在 Kubernetes 控制器中,通常会使用限速队列来处理资源的变更事件:
// 创建一个限速队列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// 添加事件处理函数
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {key, err := cache.MetaNamespaceKeyFunc(obj)if err == nil {queue.Add(key)}},UpdateFunc: func(old, new interface{}) {key, err := cache.MetaNamespaceKeyFunc(new)if err == nil {queue.Add(key)}},DeleteFunc: func(obj interface{}) {key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)if err == nil {queue.Add(key)}},
})// 处理队列中的项目
for {// 获取一个项目key, quit := queue.Get()if quit {return}// 处理完成后调用 Donedefer queue.Done(key)// 处理项目err := processItem(key.(string))if err == nil {// 处理成功,忘记项目的重试历史queue.Forget(key)} else if queue.NumRequeues(key) < maxRetries {// 处理失败,重新添加到队列queue.AddRateLimited(key)} else {// 超过最大重试次数,放弃处理queue.Forget(key)utilruntime.HandleError(err)}
}
通过这种方式,Kubernetes 控制器可以有效地处理资源变更事件,并在处理失败时进行有限次数的重试,避免因频繁重试导致的系统压力。