Reflector 是 Kubernetes client-go 中的一个核心组件,负责从 API Server 获取资源对象并将其同步到本地缓存中。本文将解析 Reflector 的实现原理及源码细节。
本文阅读代码链接:https://github.com/kubernetes/client-go/tree/release-1.30
一. Reflector 的基本概念
专业描述:
Reflector(反射器)的主要功能是通过 List 和 Watch 操作,将 etcd 中的数据"反射"到本地存储(DeltaFIFO)中。它首先获取资源对象的完整列表,然后持续监视资源变化并触发相应事件处理。
大白话:
Reflector 是什么?想象一下 Reflector 就像是一个秘书,它的工作是帮你持续关注 Kubernetes 集群中的资源变化,然后把这些信息记录在本地笔记本(缓存)中。
Reflector 主要做两件事:
-
先列出所有信息(List 操作):
-
就像新秘书第一天上班,先把所有档案都复印一份
-
获取所有当前存在的资源对象(如 Pod、Deployment 等)
-
-
然后持续关注变化(Watch 操作):
-
就像秘书坐在办公室,随时记录新文件、更新和删除的文件
-
持续监听资源的创建、更新和删除事件
-
二. Reflector 的结构
Reflector 的核心结构定义在 tools/cache/reflector.go 文件中:
type Reflector struct {// 反射器名称,默认为 文件:行数name string// 期望放到 Store 中的类型名称expectedTypeName string// 放到 Store 中的对象类型expectedType reflect.Type// 期望的 GVK(GroupVersionKind)expectedGVK *schema.GroupVersionKind// 存储目标,用于同步数据store Store// 最重要的接口,用于执行 List 和 Watch 操作listerWatcher ListerWatcher// 处理退避逻辑backoffManager wait.BackoffManager// 重新同步周期resyncPeriod time.Duration// 用于确定是否需要重新同步ShouldResync func() bool// 其他字段...lastSyncResourceVersion stringisLastSyncResourceVersionGone bool// ...
}
三. ListerWatcher 接口
ListerWatcher 是 Reflector 的核心接口
,定义了 List
和 Watch
操作:
type ListerWatcher interface {// List 返回资源对象列表List(options metav1.ListOptions) (runtime.Object, error)// Watch 从指定版本开始监视资源变化Watch(options metav1.ListOptions) (watch.Interface, error)
}
四. Reflector 的工作流程
Reflector 通过 Run
方法启动工作:
// Run 函数:不断地使用 ListAndWatch 方法获取所有对象和它们的变化
// 当 stopCh 关闭时,Run 函数才会退出
func (r *Reflector) Run(stopCh <-chan struct{}) {klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)// 开始工作wait.BackoffUntil(func() {// 调用 ListAndWatch 方法获取数据if err := r.ListAndWatch(stopCh); err != nil {// 如果出错,处理错误r.watchErrorHandler(r, err)}}, r.backoffManager, true, stopCh)// 停止工作klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}
4.1 ListAndWatch 方法
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {// 记录开始监视的资源类型和来源klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)var err errorvar w watch.Interface// 判断是否使用 watchList 功能(这是一个新特性)useWatchList := ptr.Deref(r.UseWatchList, false)fallbackToList := !useWatchList// 尝试使用 watchList 功能if useWatchList {w, err = r.watchList(stopCh)// 处理各种错误情况,如果watchList失败回退到标准的 LIST/WATCH 模式if err != nil {fallbackToList = truew = nil}}// 如果需要使用标准的 LIST 方式if fallbackToList {err = r.list(stopCh) // 执行列表操作if err != nil {return err}}// 缓存填充完成的日志klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)// 设置 resync 通道和取消通道resyncerrc := make(chan error, 1)cancelCh := make(chan struct{})defer close(cancelCh)// 启动重新同步协程go r.startResync(stopCh, cancelCh, resyncerrc)// 开始 watch 操作return r.watch(w, stopCh, resyncerrc)
}
1. 列表操作(传统List)
当 fallbackToList 为 true 时,执行标准的 List 操作:
-
调用 r.list(stopCh) 方法,该方法会:
-
通过 r.listerWatcher.List() 获取资源的完整列表
-
从结果中提取 ResourceVersion(资源版本号)
-
将资源列表转换为对象列表
-
通过 r.syncWith() 方法将这些对象存储到本地缓存
-
调用 r.setLastSyncResourceVersion() 更新最后同步的资源版本号
2. 监视操作(Watch)
list 完成后,调用 r.watch() 方法:
-
使用前面获取的 ResourceVersion 启动 Watch 操作
-
如果 Watch 不存在,则创建新的 Watch,使用最后同步的资源版本号
-
通过 r.listerWatcher.Watch() 建立与 API Server 的长连接
-
调用 watchHandler 函数处理来自 Watch 的事件
-
根据事件类型(Added、Modified、Deleted、Bookmark)更新本地缓存
-
在处理每个事件后更新 ResourceVersion
3. 重新同步(Resync)
同时启动一个 r.startResync() 协程:
-
根据配置的 resyncPeriod 定期触发重新同步
-
如果 ShouldResync 返回 true,则调用 r.store.Resync()
-
这确保即使对象没有变化,也能定期处理所有对象
WatchList 新特性
代码中的 watchList 是一个较新的特性:
-
它试图通过单个流式请求获取初始状态和后续变更
-
与传统的 List+Watch 相比,这种方式
消耗更少
的服务器资源 -
如果 watchList 失败,会回退到标准的 List+Watch 模式
4. 列表操作(WatchList)
关键流程:
-
创建临时存储:
temporaryStore = NewStore(...)
-
发送特殊 Watch 请求:设置
SendInitialEvents=true
-
服务器响应流程:
-
先发送所有现有对象的 "Added" 事件
-
最后发送带有特殊标记的 "Bookmark" 事件
-
-
同步完成后:
-
将临时存储中的对象替换到正式存储
-
继续使用此 Watch 连接获取后续事件
-
两种场景
// watchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a consistent stream with the server.
// That means the returned data is consistent, as if, served directly from etcd via a quorum read.
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
//
// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a stream with the server at the provided resource version.
// To establish the initial state the server begins with synthetic "Added" events.
// It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
源码注释描述了两种使用场景:
-
从最新资源开始 (RV=""):
-
建立与服务器的一致性流
-
服务器发送所有现有资源的 "Added" 事件
-
最后发送包含最新资源版本的 "Bookmark" 事件
-
-
从特定版本开始 (RV>"0"):
-
从指定的资源版本开始建立流
-
服务器发送该版本之后的所有资源的 "Added" 事件
-
最后发送包含指定或更新版本的 "Bookmark" 事件
-
4.2 watchHandler 方法
watchHandler 是 Reflector 中处理 watch 事件的核心方法,负责接收 API Server 发送的事件并更新本地存储。
方法参数分析
func watchHandler(start time.Time, // 开始时间,用于计算持续时间w watch.Interface, // watch 接口,事件来源store Store, // 存储接口,保存事件对象expectedType reflect.Type, // 期望的对象类型expectedGVK *schema.GroupVersionKind, // 期望的 GVKname string, // 反射器名称expectedTypeName string, // 期望类型名称setLastSyncResourceVersion func(string), // 设置最后同步资源版本的函数exitOnInitialEventsEndBookmark *bool, // WatchList 模式特有参数clock clock.Clock, // 时钟接口errc chan error, // 错误通道stopCh <-chan struct{}, // 停止信号通道
)
核心工作流程
1.初始化
-
初始化事件计数器
-
重置 WatchList 模式的特殊标记
eventCount := 0if exitOnInitialEventsEndBookmark != nil {*exitOnInitialEventsEndBookmark = false}
2.事件循环
-
通过 select 多路复用监听多个通道
-
处理停止信号、错误信号和事件信号
loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return errcase event, ok := <-w.ResultChan():// 处理事件...}}
3.事件类型验证
-
验证事件对象类型是否符合预期
-
验证 GVK 是否符合预期
if expectedType != nil {if e, a := expectedType, reflect.TypeOf(event.Object); e != a {// 处理类型不匹配utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))continue}}
4.根据事件类型处理
-
根据事件类型(Added、Modified、Deleted、Bookmark)调用不同的存储方法
-
处理可能的存储错误
switch event.Type {case watch.Added:err := store.Add(event.Object)// ...case watch.Modified:err := store.Update(event.Object)// ...case watch.Deleted:err := store.Delete(event.Object)// ...case watch.Bookmark:// 处理 Bookmark 事件// ...}
5.书签事件特殊处理
-
检查书签事件是否标记了初始事件流的结束
-
在 WatchList 模式中,这表示所有初始对象已接收完毕
if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {if exitOnInitialEventsEndBookmark != nil {*exitOnInitialEventsEndBookmark = true}}
6.资源版本更新
-
更新最后同步的资源版本
-
如果存储实现了 ResourceVersionUpdater 接口,也更新存储的资源版本
setLastSyncResourceVersion(resourceVersion)if rvu, ok := store.(ResourceVersionUpdater); ok {rvu.UpdateResourceVersion(resourceVersion)}
7.WatchList 模式特殊处理
- 如果是 WatchList 模式且收到了初始事件结束书签,退出处理
if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {watchDuration := clock.Since(start)klog.V(4).Infof("exiting %v Watch because received the bookmark...", name)return nil}
8.watch 结束处理
-
计算 watch 持续时间
-
如果 watch 时间太短且没有收到事件,返回错误
watchDuration := clock.Since(start)if watchDuration < 1*time.Second && eventCount == 0 {return fmt.Errorf("very short watch: %s: Unexpected watch close...", name)}
特殊设计亮点
通用处理框架:
-
同一个 watchHandler 方法既支持传统 Watch 模式,也支持新的 WatchList 模式
-
通过 exitOnInitialEventsEndBookmark 参数区分不同模式
初始事件流结束标记:
-
使用带有特殊注解 k8s.io/initial-events-end 的 Bookmark 事件标记初始事件流结束
-
这是 WatchList 模式的关键机制
灵活的资源版本更新:
-
通过函数参数 setLastSyncResourceVersion 支持不同的资源版本更新策略
-
支持存储对象自定义的资源版本更新机制
精细的错误处理:
-
对每种操作的错误都进行捕获和记录
-
通过 utilruntime.HandleError 统一处理错误
短 watch 检测:
-
检测异常短的 watch 操作(小于1秒且无事件)
-
防止因网络问题导致的频繁重连
watchHandler 方法是 Reflector 处理增量更新的核心,其设计既支持传统模式,又能适应新的优化策略,体现了代码的灵活性
五. ResourceVersion 的重要性
ResourceVersion(资源版本号)是保证一致性的关键:
-
每个资源对象都有 ResourceVersion
-
每次修改(创建、更新、删除)资源时,API Server 都会更新 ResourceVersion
-
Watch 操作使用 ResourceVersion 来确定资源是否变化
-
Reflector 持续追踪最新的 ResourceVersion
六. 实际应用示例
以 Deployment 资源为例,在创建 DeploymentInformer 时:
- 初始化时传入 ListWatch 对象,包含 List 和 Watch 的实现
- List 通过 client.AppsV1().Deployments(namespace).List() 实现
- Watch 通过 client.AppsV1().Deployments(namespace).Watch() 实现
- Reflector 使用这些方法获取 Deployment 资源并监视变化
七. 总结
- Reflector 是客户端缓存机制的核心组件
- 通过 List 获取资源的完整状态,通过 Watch 监视增量变化
- 将数据存储到本地缓存(DeltaFIFO)中
- 通过 ResourceVersion 维护数据一致性
- 为 Informer 模式提供基础数据同步能力
Reflector 的设计保证了客户端能够高效地与 API Server 同步数据,同时减轻了 API Server 的负担,是 Kubernetes 控制器模式的重要基础组件。