k8s client-go 的 Reflector 源码分析

news/2025/3/10 23:08:15/文章来源:https://www.cnblogs.com/rxg456/p/18763964

Reflector 是 Kubernetes client-go 中的一个核心组件,负责从 API Server 获取资源对象并将其同步到本地缓存中。本文将解析 Reflector 的实现原理及源码细节。

本文阅读代码链接:https://github.com/kubernetes/client-go/tree/release-1.30

一. Reflector 的基本概念

专业描述:

Reflector(反射器)的主要功能是通过 List 和 Watch 操作,将 etcd 中的数据"反射"到本地存储(DeltaFIFO)中。它首先获取资源对象的完整列表,然后持续监视资源变化并触发相应事件处理。

大白话:

Reflector 是什么?想象一下 Reflector 就像是一个秘书,它的工作是帮你持续关注 Kubernetes 集群中的资源变化,然后把这些信息记录在本地笔记本(缓存)中。

Reflector 主要做两件事:

  1. 先列出所有信息(List 操作):

    • 就像新秘书第一天上班,先把所有档案都复印一份

    • 获取所有当前存在的资源对象(如 Pod、Deployment 等)

  2. 然后持续关注变化(Watch 操作):

    • 就像秘书坐在办公室,随时记录新文件、更新和删除的文件

    • 持续监听资源的创建、更新和删除事件

二. Reflector 的结构

Reflector 的核心结构定义在 tools/cache/reflector.go 文件中:

type Reflector struct {// 反射器名称,默认为 文件:行数name string// 期望放到 Store 中的类型名称expectedTypeName string// 放到 Store 中的对象类型expectedType reflect.Type// 期望的 GVK(GroupVersionKind)expectedGVK *schema.GroupVersionKind// 存储目标,用于同步数据store Store// 最重要的接口,用于执行 List 和 Watch 操作listerWatcher ListerWatcher// 处理退避逻辑backoffManager wait.BackoffManager// 重新同步周期resyncPeriod time.Duration// 用于确定是否需要重新同步ShouldResync func() bool// 其他字段...lastSyncResourceVersion stringisLastSyncResourceVersionGone bool// ...
}

三. ListerWatcher 接口

ListerWatcher 是 Reflector 的核心接口,定义了 ListWatch 操作:

type ListerWatcher interface {// List 返回资源对象列表List(options metav1.ListOptions) (runtime.Object, error)// Watch 从指定版本开始监视资源变化Watch(options metav1.ListOptions) (watch.Interface, error)
}

四. Reflector 的工作流程

Reflector 通过 Run 方法启动工作:

// Run 函数:不断地使用 ListAndWatch 方法获取所有对象和它们的变化
// 当 stopCh 关闭时,Run 函数才会退出
func (r *Reflector) Run(stopCh <-chan struct{}) {klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)// 开始工作wait.BackoffUntil(func() {// 调用 ListAndWatch 方法获取数据if err := r.ListAndWatch(stopCh); err != nil {// 如果出错,处理错误r.watchErrorHandler(r, err)}}, r.backoffManager, true, stopCh)// 停止工作klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}

4.1 ListAndWatch 方法

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {// 记录开始监视的资源类型和来源klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)var err errorvar w watch.Interface// 判断是否使用 watchList 功能(这是一个新特性)useWatchList := ptr.Deref(r.UseWatchList, false)fallbackToList := !useWatchList// 尝试使用 watchList 功能if useWatchList {w, err = r.watchList(stopCh)// 处理各种错误情况,如果watchList失败回退到标准的 LIST/WATCH 模式if err != nil {fallbackToList = truew = nil}}// 如果需要使用标准的 LIST 方式if fallbackToList {err = r.list(stopCh)  // 执行列表操作if err != nil {return err}}// 缓存填充完成的日志klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)// 设置 resync 通道和取消通道resyncerrc := make(chan error, 1)cancelCh := make(chan struct{})defer close(cancelCh)// 启动重新同步协程go r.startResync(stopCh, cancelCh, resyncerrc)// 开始 watch 操作return r.watch(w, stopCh, resyncerrc)
}

1. 列表操作(传统List)

当 fallbackToList 为 true 时,执行标准的 List 操作:

  • 调用 r.list(stopCh) 方法,该方法会:

  • 通过 r.listerWatcher.List() 获取资源的完整列表

  • 从结果中提取 ResourceVersion(资源版本号)

  • 将资源列表转换为对象列表

  • 通过 r.syncWith() 方法将这些对象存储到本地缓存

  • 调用 r.setLastSyncResourceVersion() 更新最后同步的资源版本号

2. 监视操作(Watch)

list 完成后,调用 r.watch() 方法:

  • 使用前面获取的 ResourceVersion 启动 Watch 操作

  • 如果 Watch 不存在,则创建新的 Watch,使用最后同步的资源版本号

  • 通过 r.listerWatcher.Watch() 建立与 API Server 的长连接

  • 调用 watchHandler 函数处理来自 Watch 的事件

  • 根据事件类型(Added、Modified、Deleted、Bookmark)更新本地缓存

  • 在处理每个事件后更新 ResourceVersion

3. 重新同步(Resync)

同时启动一个 r.startResync() 协程:

  • 根据配置的 resyncPeriod 定期触发重新同步

  • 如果 ShouldResync 返回 true,则调用 r.store.Resync()

  • 这确保即使对象没有变化,也能定期处理所有对象

WatchList 新特性

代码中的 watchList 是一个较新的特性:

  • 它试图通过单个流式请求获取初始状态和后续变更

  • 与传统的 List+Watch 相比,这种方式消耗更少的服务器资源

  • 如果 watchList 失败,会回退到标准的 List+Watch 模式

4. 列表操作(WatchList)

关键流程:

  1. 创建临时存储:temporaryStore = NewStore(...)

  2. 发送特殊 Watch 请求:设置 SendInitialEvents=true

  3. 服务器响应流程:

    • 先发送所有现有对象的 "Added" 事件

    • 最后发送带有特殊标记的 "Bookmark" 事件

  4. 同步完成后:

    • 将临时存储中的对象替换到正式存储

    • 继续使用此 Watch 连接获取后续事件

两种场景
// watchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a consistent stream with the server.
// That means the returned data is consistent, as if, served directly from etcd via a quorum read.
// It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
// It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.
//
// case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
// Establishes a stream with the server at the provided resource version.
// To establish the initial state the server begins with synthetic "Added" events.
// It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
// After receiving a "Bookmark" event the reflector is considered to be synchronized.
// It replaces its internal store with the collected items and
// reuses the current watch requests for getting further events.

源码注释描述了两种使用场景:

  1. 从最新资源开始 (RV=""):

    • 建立与服务器的一致性流

    • 服务器发送所有现有资源的 "Added" 事件

    • 最后发送包含最新资源版本的 "Bookmark" 事件

  2. 从特定版本开始 (RV>"0"):

    • 从指定的资源版本开始建立流

    • 服务器发送该版本之后的所有资源的 "Added" 事件

    • 最后发送包含指定或更新版本的 "Bookmark" 事件

4.2 watchHandler 方法

watchHandler 是 Reflector 中处理 watch 事件的核心方法,负责接收 API Server 发送的事件并更新本地存储。

方法参数分析

func watchHandler(start time.Time,                            // 开始时间,用于计算持续时间w watch.Interface,                          // watch 接口,事件来源store Store,                                // 存储接口,保存事件对象expectedType reflect.Type,                  // 期望的对象类型expectedGVK *schema.GroupVersionKind,       // 期望的 GVKname string,                                // 反射器名称expectedTypeName string,                    // 期望类型名称setLastSyncResourceVersion func(string),    // 设置最后同步资源版本的函数exitOnInitialEventsEndBookmark *bool,       // WatchList 模式特有参数clock clock.Clock,                          // 时钟接口errc chan error,                            // 错误通道stopCh <-chan struct{},                     // 停止信号通道
)

核心工作流程

1.初始化
  • 初始化事件计数器

  • 重置 WatchList 模式的特殊标记

   eventCount := 0if exitOnInitialEventsEndBookmark != nil {*exitOnInitialEventsEndBookmark = false}
2.事件循环
  • 通过 select 多路复用监听多个通道

  • 处理停止信号、错误信号和事件信号

   loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return errcase event, ok := <-w.ResultChan():// 处理事件...}}
3.事件类型验证
  • 验证事件对象类型是否符合预期

  • 验证 GVK 是否符合预期

   if expectedType != nil {if e, a := expectedType, reflect.TypeOf(event.Object); e != a {// 处理类型不匹配utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))continue}}
4.根据事件类型处理
  • 根据事件类型(Added、Modified、Deleted、Bookmark)调用不同的存储方法

  • 处理可能的存储错误

   switch event.Type {case watch.Added:err := store.Add(event.Object)// ...case watch.Modified:err := store.Update(event.Object)// ...case watch.Deleted:err := store.Delete(event.Object)// ...case watch.Bookmark:// 处理 Bookmark 事件// ...}
5.书签事件特殊处理
  • 检查书签事件是否标记了初始事件流的结束

  • 在 WatchList 模式中,这表示所有初始对象已接收完毕

   if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {if exitOnInitialEventsEndBookmark != nil {*exitOnInitialEventsEndBookmark = true}}
6.资源版本更新
  • 更新最后同步的资源版本

  • 如果存储实现了 ResourceVersionUpdater 接口,也更新存储的资源版本

   setLastSyncResourceVersion(resourceVersion)if rvu, ok := store.(ResourceVersionUpdater); ok {rvu.UpdateResourceVersion(resourceVersion)}
7.WatchList 模式特殊处理
  • 如果是 WatchList 模式且收到了初始事件结束书签,退出处理
   if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {watchDuration := clock.Since(start)klog.V(4).Infof("exiting %v Watch because received the bookmark...", name)return nil}
8.watch 结束处理
  • 计算 watch 持续时间

  • 如果 watch 时间太短且没有收到事件,返回错误

   watchDuration := clock.Since(start)if watchDuration < 1*time.Second && eventCount == 0 {return fmt.Errorf("very short watch: %s: Unexpected watch close...", name)}

特殊设计亮点

通用处理框架:

  • 同一个 watchHandler 方法既支持传统 Watch 模式,也支持新的 WatchList 模式

  • 通过 exitOnInitialEventsEndBookmark 参数区分不同模式

初始事件流结束标记:

  • 使用带有特殊注解 k8s.io/initial-events-end 的 Bookmark 事件标记初始事件流结束

  • 这是 WatchList 模式的关键机制

灵活的资源版本更新:

  • 通过函数参数 setLastSyncResourceVersion 支持不同的资源版本更新策略

  • 支持存储对象自定义的资源版本更新机制

精细的错误处理:

  • 对每种操作的错误都进行捕获和记录

  • 通过 utilruntime.HandleError 统一处理错误

短 watch 检测:

  • 检测异常短的 watch 操作(小于1秒且无事件)

  • 防止因网络问题导致的频繁重连

watchHandler 方法是 Reflector 处理增量更新的核心,其设计既支持传统模式,又能适应新的优化策略,体现了代码的灵活性

五. ResourceVersion 的重要性

ResourceVersion(资源版本号)是保证一致性的关键:

  • 每个资源对象都有 ResourceVersion

  • 每次修改(创建、更新、删除)资源时,API Server 都会更新 ResourceVersion

  • Watch 操作使用 ResourceVersion 来确定资源是否变化

  • Reflector 持续追踪最新的 ResourceVersion

六. 实际应用示例

以 Deployment 资源为例,在创建 DeploymentInformer 时:

  1. 初始化时传入 ListWatch 对象,包含 List 和 Watch 的实现
  2. List 通过 client.AppsV1().Deployments(namespace).List() 实现
  3. Watch 通过 client.AppsV1().Deployments(namespace).Watch() 实现
  4. Reflector 使用这些方法获取 Deployment 资源并监视变化

七. 总结

  1. Reflector 是客户端缓存机制的核心组件
  2. 通过 List 获取资源的完整状态,通过 Watch 监视增量变化
  3. 将数据存储到本地缓存(DeltaFIFO)中
  4. 通过 ResourceVersion 维护数据一致性
  5. 为 Informer 模式提供基础数据同步能力

Reflector 的设计保证了客户端能够高效地与 API Server 同步数据,同时减轻了 API Server 的负担,是 Kubernetes 控制器模式的重要基础组件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/897034.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《深入理解计算机网络》 | PDF免费下载 | free download

《深入理解计算机网络》是计算机网络领域的扛鼎之作,由有20余年从业经验的优秀网络技术工程师兼全国网管技能水平开始认证专家王达老师撰写,51CTO技术社区鼎力推荐,权威性毋庸置疑。内容方面,本书结合最新计算机网络技术,全面、系统、深入地阐述了计算机网络的体系结构、工…

《深入理解LINUX内核(第三版)》 | PDF免费下载 | epub free download

《深入理解Linux内核》第3版 将使你了解Linux的所有内部工作,它不仅仅是一个理论上的练习。你将学习到哪些情况下Linux性能最佳,并且你将看到,在大量的不同环境里进行进程调度、文件存取和内存管理时它如何满足提供良好的系统响应的需要。这本书将帮助你充分利用Linux系统。…

Android Studio的配置学习以及整日的总结

所花时间:5h 代码量(行):130 博客量:6 了解的知识点: 首先,今天对于Android Studio的整体配置有了一个新的认识,原因是AS的SDK和虚拟机AVD的默认安装位置在C盘 还有环境变量gradle的下载配置,这些都要了解明白是干什么的,下面我具体的说一下:安装好AS,下载好gradle…

额外添加 _网卡的配置,网络的基础的概念

配置好网卡,上网用 修改网络模式,修改静态ip,动态ip获取方式 system control 系统控制,systemctl 查看当前的上网信息 1.确保你的机器,是连接的网络的,是插上了网线的。(模拟了物理服务器的软件是什么?看你的虚拟的机器(vmware))3.编辑网卡的配置文件 编辑网卡配置文…

用集合说明可以用与或非来表示异或

用集合说明可以用与或非来表示异或 异或(XOR,记作 A⊕BA⊕B)可以通过与(AND,记作 ∧∧)、或(OR,记作 ∨∨)、非(NOT,记作 )的组合来表示。以下是两种常见的表达式形式: 1. 直接组合形式 异或的逻辑可以描述为:当且仅当 A 和 B不同时为真时输出真。 即:A B A⊕B0…

2025.3.10

1,访问PHP文件路径不能出现中文,会显示0行错误 2,<br>换行,<hr>分割线 3,“ ”解析并输出,‘ ’直接输出 4,字符串拼接用. 5,输出 echo "" print_r() 专门输出数组的格式 varr_dump() 数据类型和值6,二维数组

4.9.1 分布偏移的类型

下面介绍一下坐标系中协变量偏移的情况如上图,绿色的曲线是正确的曲线。我们训练的数据是左图,测试的数据是右图。按照左图的数据学习是学不出来绿色曲线的,就会导致右图的数据的准确率很低,这就是协变量偏移

贴现率8%和12%分别计算每个项目的净现值

使用贴现率8%和12%分别计算每个项目的净现值(NPV) JAVA实现 净现值求解实现代码(JAVA):package com.zuoye.Three;import java.math.BigDecimal;public class TieXianLu { public static void main(String[] args) throws Exception { //数据集合 int[] ma…

Netty基础—1.网络编程基础一

大纲 1.什么是OSI开放系统互连 2.OSI七层模型各层的作用 3.TCP/IP协议的简介 4.TCP和UDP的简介 5.TCP连接的三次握手 6.TCP连接的四次挥手 7.TCP/IP中的数据包 8.TCP通过确认应答与序列号提高可靠性 9.HTTP请求的传输过程 10.HTTP协议报文结构 11.Socket、短连接、长连接、网络…

硅基流动+Chatbox实现deepseek R1使用自由

解锁 DeepSeek R1 全能力:高性价比AI对话全流程指南🌟 用最低成本体验顶尖开源模型,对话成本低至 2 分钱/次!一、注册 SiliconFlow 领取千万Tokens▶️ 操作步骤:点击专属链接注册:https://cloud.siliconflow.cn/i/KoKtjLvD 手机验证码登录 → 立即获得 2000万 Tokens(…

3.1.1 线性回归的基本元素

看看批量梯度下降和小批量梯度下降的图形,与我们的理解是相符的注意到小批量梯度下降不是严格单减的,只是趋势是单调减少的(图中的纵轴Cost指的是对于整个训练数据的损失) 每次的批量的大小显然是一个超参数。当批量大小为\(1\)的时候叫做随机梯度下降,当批量大小为\(m\)的…