k8s的operator基石:controller-runtime源码解析

写在之前

今天开始开更controller-runtime的源码阅读,笔者建议大家在阅读前了解以下知识,可能会帮助大家更好的理解源码逻辑。

1.client-go的基础使用
2. 使用kubebuilder搭建一个简单的controller-runtime环境
3.informer的基本思想

1.源码环境搭建

参考链接:https://book.kubebuilder.io/cronjob-tutorial/cronjob-tutorial

2.源码阅读

2.1 万物伊始,问题的关键是定位关键的问题

首先定位controller的核心代码逻辑,main.go,如果你是使用kububuilder生成的代码,该代码在cmd文件夹下。排除掉杂七杂八的flag解析、日志实体初始化逻辑后,main方法的核心逻辑大概有分为三个步骤:

  1. 构建manage
  2. 向manage中注册自定义的Reconciler方法
  3. 启动manage
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme:                 scheme,MetricsBindAddress:     metricsAddr,Port:                   9443,HealthProbeBindAddress: probeAddr,LeaderElection:         enableLeaderElection,LeaderElectionID:       "80807133.tutorial.kubebuilder.io",// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily// when the Manager ends. This requires the binary to immediately end when the// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly// speeds up voluntary leader transitions as the new leader don't have to wait// LeaseDuration time first.//// In the default scaffold provided, the program ends immediately after// the manager stops, so would be fine to enable this option. However,// if you are doing or is intended to do any operation such as perform cleanups// after the manager stops then its usage might be unsafe.// LeaderElectionReleaseOnCancel: true,
})
....
if err = (&controller.CronJobReconciler{Client: mgr.GetClient(),Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "CronJob")os.Exit(1)
}
......
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {setupLog.Error(err, "problem running manager")os.Exit(1)
}

接下来我按照这三条链路逐步进行代码分析。

2.2 ctrl.NewManager

这个方法的注释写的是returns a new Manager for creating Controllers.创建controller的管理器,主要是一些初始化的逻辑,构建controllerManager结构体

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme:                 scheme,MetricsBindAddress:     metricsAddr,Port:                   9443,HealthProbeBindAddress: probeAddr,LeaderElection:         enableLeaderElection,LeaderElectionID:       "80807133.tutorial.kubebuilder.io",// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily// when the Manager ends. This requires the binary to immediately end when the// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly// speeds up voluntary leader transitions as the new leader don't have to wait// LeaseDuration time first.//// In the default scaffold provided, the program ends immediately after// the manager stops, so would be fine to enable this option. However,// if you are doing or is intended to do any operation such as perform cleanups// after the manager stops then its usage might be unsafe.// LeaderElectionReleaseOnCancel: true,
})

上述方法的入参由两部分组成,一部分是ctrl.GetConfigOrDie(),一部分是options,因k8s的client初始化的时候需要加载kubeconfig,所以猜测方法一是kubeconfig加载的核心流程。现在开始追踪ctrl.GetConfigOrDie()去看看kubeconfig是如何加载。

2.2.1 kubeconfig的加载逻辑

我们沿着代码执行链路一步步追踪加载kubeconfig的具体的实现位置。

// 这里是核心逻辑
func loadConfig(context string) (config *rest.Config, configErr error) {// If a flag is specified with the config location, use that// 1.这里的kubeconfig是哪里来的呢,这个参数不是当前function的私有参数,我们可以追踪这个参数的初始化位置,可以查询这个参数的来源if len(kubeconfig) > 0 {// loadConfigWithContext 这个方法似乎是加载client的核心逻辑,整个方法中引用了两次return loadConfigWithContext("", &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig}, context)}// 如果flag中没有传递kubeconfig,那么就从环境变量中获取kubeconfig文件的所在处,通过文件初始化rest.config//RecommendedConfigPathEnvVar = "KUBECONFIG",获取环境变量指向的kubeconfig所在位置kubeconfigPath := os.Getenv(clientcmd.RecommendedConfigPathEnvVar)if len(kubeconfigPath) == 0 {// 从容器中获取token初始化rest.config,参考2c, err := loadInClusterConfig()if err == nil {return c, nil}defer func() {if configErr != nil {log.Error(err, "unable to load in-cluster config")}}()}// 这里定义了kubeconfig加载的规则,会遍历~/.kube或者环境变量中定义的KUBECONFIG路径去加载配置文件loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()if _, ok := os.LookupEnv("HOME"); !ok {u, err := user.Current()if err != nil {return nil, fmt.Errorf("could not get current user: %w", err)}loadingRules.Precedence = append(loadingRules.Precedence, filepath.Join(u.HomeDir, clientcmd.RecommendedHomeDir, clientcmd.RecommendedFileName))}// 这里是核心的client初始化逻辑,参考3.return loadConfigWithContext("", loadingRules, context)
}

1)在当前代码文件中检索到了kubeconfig这个参数的初始化逻辑,该参数是从名为kubeconfig的flag中解析获得的,只要在operator的启动命令中传递了kubeconfig的flag标识,就可以解析到逻辑中

func init() {RegisterFlags(flag.CommandLine)
}const KubeconfigFlagName = "kubeconfig"
func RegisterFlags(fs *flag.FlagSet) {if fs == nil {fs = flag.CommandLine}// KubeconfigFlagName的值是kubeconfigif f := fs.Lookup(KubeconfigFlagName); f != nil {kubeconfig = f.Value.String()} else {fs.StringVar(&kubeconfig, KubeconfigFlagName, "", "Paths to a kubeconfig. Only required if out-of-cluster.")}
}

2) 从pod中初始化rest.config

//InClusterConfig 返回一个配置对象,该对象使用 kubernetes 提供给 pod 的服务帐户。
//它适用于希望在 kubernetes 上运行的 pod 内运行的客户端。如果从不在 kubernetes 环境中运行的进程调用,
//它将返回 ErrNotInCluster。
func InClusterConfig() (*Config, error) {const (tokenFile  = "/var/run/secrets/kubernetes.io/serviceaccount/token"rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")// 如果好奇这两个值的含义,我可以在后文中贴上在pod中检索这两个环境变量的贴图host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")if len(host) == 0 || len(port) == 0 {return nil, ErrNotInCluster}token, err := os.ReadFile(tokenFile)if err != nil {return nil, err}tlsClientConfig := TLSClientConfig{}if _, err := certutil.NewPool(rootCAFile); err != nil {// 这一步主要是验证这个证书路径是否合法klog.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)} else {tlsClientConfig.CAFile = rootCAFile}// 返回rest.configreturn &Config{// TODO: switch to using cluster DNS.Host:            "https://" + net.JoinHostPort(host, port), //这里是吧host:port做ipv4和ipv6的格式转换TLSClientConfig: tlsClientConfig,BearerToken:     string(token),BearerTokenFile: tokenFile,}, nil
}

在这里插入图片描述
在这里插入图片描述
3)restconfig初始化逻辑
这里是client go初始化rest.config的标准function,这里不做解读了。

func loadConfigWithContext(apiServerURL string, loader clientcmd.ClientConfigLoader, context string) (*rest.Config, error) {return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loader,&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: apiServerURL,},CurrentContext: context,}).ClientConfig()
}

2.2.2 开始探索ctrl.NewManager 这个方法

这个方法的注释写的是returns a new Manager for creating Controllers.创建controller的管理器,主要是一些初始化的逻辑,构建controllerManager结构体.

func New(config *rest.Config, options Options) (Manager, error) {// Set default values for options fields// 2.1.设置options的默认值,这里是对于options中未被用户显示传入的参数进行进行默认值赋值options = setOptionsDefaults(options)// 2.2这里是对cluster默认初始化,cluster类就是与集群进行交互的实体类cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {clusterOptions.Scheme = options.SchemeclusterOptions.MapperProvider = options.MapperProviderclusterOptions.Logger = options.LoggerclusterOptions.SyncPeriod = options.SyncPeriodclusterOptions.Namespace = options.NamespaceclusterOptions.NewCache = options.NewCacheclusterOptions.NewClient = options.NewClientclusterOptions.ClientDisableCacheFor = options.ClientDisableCacheForclusterOptions.DryRunClient = options.DryRunClientclusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck})if err != nil {return nil, err}......return &controllerManager{stopProcedureEngaged:          pointer.Int64(0),cluster:                       cluster,runnables:                     runnables,errChan:                       errChan,recorderProvider:              recorderProvider,resourceLock:                  resourceLock,metricsListener:               metricsListener,metricsExtraHandlers:          metricsExtraHandlers,controllerOptions:             options.Controller,logger:                        options.Logger,elected:                       make(chan struct{}),port:                          options.Port,host:                          options.Host,certDir:                       options.CertDir,tlsOpts:                       options.TLSOpts,webhookServer:                 options.WebhookServer,leaderElectionID:              options.LeaderElectionID,leaseDuration:                 *options.LeaseDuration,renewDeadline:                 *options.RenewDeadline,retryPeriod:                   *options.RetryPeriod,healthProbeListener:           healthProbeListener,readinessEndpointName:         options.ReadinessEndpointName,livenessEndpointName:          options.LivenessEndpointName,gracefulShutdownTimeout:       *options.GracefulShutdownTimeout,internalProceduresStop:        make(chan struct{}),leaderElectionStopped:         make(chan struct{}),leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,}, nil
}

2.3 SetupWithManager 注册自定义的Reconciler

我们首先把Reconciler的接口定义贴上来,这是整个opeator中为数不多的需要自定义编码的位置,这里的reconcile是用户针对指定k8s资源的变动事件(增、删除、改)的自定义处理步骤,你可以理解为informer的eventHandler中的update、add、delete处理逻辑都放在 Reconcile(context.Context, Request) (Result, error)的方法实现中,由用户自己判断资源object处于哪一类事件的状态中,并执行相应的处理逻辑。

type Reconciler interface {Reconcile(context.Context, Request) (Result, error)
}

在main方法中的应用位置是:

if err = (&controller.CronJobReconciler{Client: mgr.GetClient(),Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "CronJob")os.Exit(1)
}

我们追踪SetupWithManager的代码逻辑,从NewControllerManagedBy到For,再到Owns都是Builder这个结构体的构建过程,这里不做展开,这里只介绍Complete的代码逻辑。

func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {.....// 核心逻辑return ctrl.NewControllerManagedBy(mgr).For(&batchv1.CronJob{}).Owns(&kbatch.Job{}).Complete(r)
}

跳过一些冗余的代码逻辑,我们快进到核心逻辑。在下面的Build方法中,主要执行了两块核心逻辑:

  • 核心逻辑一:初始化controller
  • 核心逻辑二:初始化watch逻辑
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {if r == nil {return nil, fmt.Errorf("must provide a non-nil Reconciler")}if blder.mgr == nil {return nil, fmt.Errorf("must provide a non-nil Manager")}if blder.forInput.err != nil {return nil, blder.forInput.err}// Set the ControllerManagedBy// 核心逻辑一:初始化controllerif err := blder.doController(r); err != nil {return nil, err}// Set the Watch// 核心逻辑二:初始化watch逻辑if err := blder.doWatch(); err != nil {return nil, err}return blder.ctrl, nil
}

我们沿着这两条链路逐条进行分析。

2.3.1 controller的初始化

func (blder *Builder) doController(r reconcile.Reconciler) error {...... 构建ctrlOptions// Build the controller and return.// 初始化controllerblder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)return err
}

继续深入newController看一下这个结构体是怎么初始化的,下面是核心实现代码,主要做了两件事情

  • 1.初始化一个controller结构体
  • 2.把controller添加到了controllerManage中

func New(name string, mgr manager.Manager, options Options) (Controller, error) {c, err := NewUnmanaged(name, mgr, options)if err != nil {return nil, err}// Add the controller as a Manager componentsreturn c, mgr.Add(c)
}

mgr.Add©这个逻辑比较简单,就是把新生成的controller添加到manager中的缓存结构体中,我们来看看NewUnmanaged中的controller
初始化字段有哪些:

func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {// Create controller with dependencies setreturn &controller.Controller{// 这个就是自定义的ReconcilerDo: options.Reconciler,// 这个有一个队列,记住,后面要考MakeQueue: func() workqueue.RateLimitingInterface {return workqueue.NewRateLimitingQueueWithConfig(options.RateLimiter, workqueue.RateLimitingQueueConfig{Name: name,})},MaxConcurrentReconciles: options.MaxConcurrentReconciles,CacheSyncTimeout:        options.CacheSyncTimeout,Name:                    name,LogConstructor:          options.LogConstructor,RecoverPanic:            options.RecoverPanic,LeaderElected:           options.NeedLeaderElection,}, nil
}

2.3.2 doWatch初始化watch逻辑

代码实现展示在下文,这里有两个问题没有解释,一个是
source.Kind这个结构体有什么作用,一个就是allPredicates这个实体也没有解释。我们先留个坑,目前只有定义,在实际调用的时候我们在进行解释。

func (blder *Builder) doWatch() error {// Reconcile typeif blder.forInput.object != nil {obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)if err != nil {return err}// 这一个是什么source?src := source.Kind(blder.mgr.GetCache(), obj)// 这里是事件处理的eventHandler结构体,定义了队列处理逻辑hdler := &handler.EnqueueRequestForObject{}allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)allPredicates = append(allPredicates, blder.forInput.predicates...)// 看起来这里是核心逻辑执行的位置if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {return err}}......后续逻辑大同小异

我们继续深入blder.ctrl.Watch的逻辑中

func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {c.mu.Lock()defer c.mu.Unlock()if !c.Started {// 首次调用这个方法的时候进入到这个逻辑中,看起来只是把src、handler、predicate包装成一个机构体存储在watches中c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})return nil}c.LogConstructor(nil).Info("Starting EventSource", "source", src)return src.Start(c.ctx, evthdler, c.Queue, prct...)
}

关于下面的src.Start,这个步骤虽然在我们的核心流程中没有执行,但是这个方法的实现似乎有助于我们理解source.Kind这个数据结构,我们就浪费点时间,进入到start中看一下:

  • 启动了一个定时任务,循环检索献相应的informer是否已经被声明
  • 往这个informer中添加handler的处理逻辑
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,prct ...predicate.Predicate) error {....省略无关逻辑go func() {var (i       cache.InformerlastErr error)// Tries to get an informer until it returns true,// 这个步骤一直循环在寻找一个相应的informerif err := wait.PollUntilContextCancel(ctx, 10*time.Second, true, func(ctx context.Context) (bool, error) {// Lookup the Informer from the Cache and add an EventHandler which populates the Queuei, lastErr = ks.Cache.GetInformer(ctx, ks.Type)if lastErr != nil {kindMatchErr := &meta.NoKindMatchError{}switch {case errors.As(lastErr, &kindMatchErr):log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start","kind", kindMatchErr.GroupKind)case runtime.IsNotRegisteredError(lastErr):log.Error(lastErr, "kind must be registered to the Scheme")default:log.Error(lastErr, "failed to get informer from cache")}return false, nil // Retry.}return true, nil}); err != nil {if lastErr != nil {ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)return}ks.started <- errreturn}// 对informer添加handler处理逻辑_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())if err != nil {ks.started <- errreturn}if !ks.Cache.WaitForCacheSync(ctx) {// Would be great to return something more informative hereks.started <- errors.New("cache did not sync")}close(ks.started)}()return nil
}

看到这里,可能大家已经迷糊了,不要着急,我们一步一步的进行分析,因为不仅是大家迷糊,我也迷糊,这乱七八糟的是什么啊。首先看一下这个cache的真实实现是什么?

func New(cfg *rest.Config, opts Options) (Cache, error) {....配置项解析newCacheFunc := newCache(cfg, opts)var defaultCache Cacheif len(opts.DefaultNamespaces) > 0 {//这一步暂时忽略defaultConfig := optionDefaultsToConfig(&opts)defaultCache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, opts.DefaultNamespaces, &defaultConfig)} else {// 这一步就是构建的informerCache的实体类defaultCache = newCacheFunc(optionDefaultsToConfig(&opts), corev1.NamespaceAll)}if len(opts.ByObject) == 0 {return defaultCache, nil}// 分类将不同资源的informer对象保存在这个结构体中delegating := &delegatingByGVKCache{scheme:       opts.Scheme,//这里的ByObject已经通过追踪是在初始化的options中的cache字段配置的caches:       make(map[schema.GroupVersionKind]Cache, len(opts.ByObject)),defaultCache: defaultCache,}for obj, config := range opts.ByObject {gvk, err := apiutil.GVKForObject(obj, opts.Scheme)if err != nil {return nil, fmt.Errorf("failed to get GVK for type %T: %w", obj, err)}var cache Cacheif len(config.Namespaces) > 0 {cache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, config.Namespaces, nil)} else {cache = newCacheFunc(byObjectToConfig(config), corev1.NamespaceAll)}delegating.caches[gvk] = cache}return delegating, nil
}//这个方法就是构建一个informerCache的实体类
func newCache(restConfig *rest.Config, opts Options) newCacheFunc {return func(config Config, namespace string) Cache {return &informerCache{scheme: opts.Scheme,Informers: internal.NewInformers(restConfig, &internal.InformersOpts{HTTPClient:   opts.HTTPClient,Scheme:       opts.Scheme,Mapper:       opts.Mapper,ResyncPeriod: *opts.SyncPeriod,Namespace:    namespace,Selector: internal.Selector{Label: config.LabelSelector,Field: config.FieldSelector,},Transform:             config.Transform,UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false),NewInformer:           opts.newInformer,}),readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,}}
}

这一步骤看起来和sharedInformerFactory比较像,里面定义了一个map缓存,保存了GVK和对应的informer的单例实现。现在我们可以根据delegatingByGVKCache来寻找GetInformer的具体实现方法是什么了,从下面的三个方法来看,这一步骤主要是用来检索实体结构体对应的informerCache实体。

func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {// 这一步是从delegatingByGVKCache的map缓存中获取informerCache的结构体cache, err := dbt.cacheForObject(obj)if err != nil {return nil, err}// 这一步真正实现是在informerCache的方法中体现的return cache.GetInformer(ctx, obj, opts...)
}
func (dbt *delegatingByGVKCache) cacheForObject(o runtime.Object) (Cache, error) {gvk, err := apiutil.GVKForObject(o, dbt.scheme)if err != nil {return nil, err}gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")return dbt.cacheForGVK(gvk), nil
}func (dbt *delegatingByGVKCache) cacheForGVK(gvk schema.GroupVersionKind) Cache {if specific, hasSpecific := dbt.caches[gvk]; hasSpecific {return specific}return dbt.defaultCache
}func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {gvk, err := apiutil.GVKForObject(obj, ic.scheme)if err != nil {return nil, err}// 这一步有兴趣的读者可以深入了解,这里主要是进行数据的转换,转换的结果就是返回了一个sharedIndexedInformer_, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...))if err != nil {return nil, err}return i.Informer, nil
}

到这里,似乎我们就可以继续分析_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs()),这行的代码逻辑了。这里就和informer串起来了。


func NewEventHandler(ctx context.Context, queue workqueue.RateLimitingInterface, handler handler.EventHandler, predicates []predicate.Predicate) *EventHandler {return &EventHandler{ctx:        ctx,handler:    handler,queue:      queue,predicates: predicates,}
}
func (e *EventHandler) HandlerFuncs() cache.ResourceEventHandlerFuncs {return cache.ResourceEventHandlerFuncs{AddFunc:    e.OnAdd,UpdateFunc: e.OnUpdate,DeleteFunc: e.OnDelete,}
}

这三个处理逻辑是差不多的,我们看一下这里OnAdd这个方法的实现逻辑,以点代面去看一下具体的执行逻辑。

// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e *EventHandler) OnAdd(obj interface{}) {c := event.CreateEvent{}// Pull Object out of the objectif o, ok := obj.(client.Object); ok {c.Object = o} else {log.Error(nil, "OnAdd missing Object","object", obj, "type", fmt.Sprintf("%T", obj))return}//上面的内容索然无味,这里值得注意一下,先放直接放结论,之前我们没有分析predicates这个结构体,看起来这里是filer,用来过滤一些不关注的事件for _, p := range e.predicates {if !p.Create(c) {return}}// Invoke create handlerctx, cancel := context.WithCancel(e.ctx)defer cancel()// e.handler.Create(ctx, c, e.queue)
}

既然提到了,我们去找一下这个predicates的实现位置,他是在Builder结构体提供的一个WithEventFilter方法设置的,具体的使用方式的构建一个predicate.Predicate实体类,在这个实体类中定义不同事件类型的filter逻辑,可以过滤掉一些我们不关系的变更事件。

type Predicate interface {// Create returns true if the Create event should be processedCreate(event.CreateEvent) bool// Delete returns true if the Delete event should be processedDelete(event.DeleteEvent) bool// Update returns true if the Update event should be processedUpdate(event.UpdateEvent) bool// Generic returns true if the Generic event should be processedGeneric(event.GenericEvent) bool
}

我们继续追踪 e.handler.Create(ctx, c, e.queue)这个方法,看看里面发生了什么,这里的handler的真实实现位置是hdler := &handler.EnqueueRequestForObject{}。

// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {if evt.Object == nil {enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)return}//这里是往队列中添加了一个request事件q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name:      evt.Object.GetName(),Namespace: evt.Object.GetNamespace(),}})
}

现在我们可以总结一下这里的source.start做了一些什么事情了。当事件的informer监听到资源发生变换时,会触发一个handler.EnqueueRequestForObject{}的事件处理逻辑,将这个事封装成reconcile.Request{}结构体放置到controller对应的限速队列中去。

2.4 mgr.Start(ctrl.SetupSignalHandler())

启动controllerManage。我们仅截取关键代码进行分析。

2.4.1 cluster初始化=informer初始化

....
// Start and wait for caches.
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {if err != nil {return fmt.Errorf("failed to start caches: %w", err)}
}

这里的cluster我们在上文中已经见识过他的初始化逻辑了。

return &cluster{config:           originalConfig,httpClient:       options.HTTPClient,scheme:           options.Scheme,cache:            cache,fieldIndexes:     cache,client:           clientWriter,apiReader:        clientReader,recorderProvider: recorderProvider,mapper:           mapper,logger:           options.Logger,
}, nil

我们来看一下这里的cm.add做了些什么事情,他往cm的runnables这个结构体中添加了一个Runnable实体,Runnable接口中只包含了一个Start()方法。

func (cm *controllerManager) add(r Runnable) error {return cm.runnables.Add(r)
}

我们先看一下runnables里面的定义,看起来是对Runnable进行了分类存放。

type runnables struct {HTTPServers    *runnableGroupWebhooks       *runnableGroupCaches         *runnableGroupLeaderElection *runnableGroupOthers         *runnableGroup
}type runnableGroup struct {ctx    context.Contextcancel context.CancelFuncstart        sync.MutexstartOnce    sync.Oncestarted      boolstartQueue   []*readyRunnablestartReadyCh chan *readyRunnablestop     sync.RWMutexstopOnce sync.Oncestopped  bool// errChan is the error channel passed by the caller// when the group is created.// All errors are forwarded to this channel once they occur.errChan chan error// ch is the internal channel where the runnables are read off from.ch chan *readyRunnable// wg is an internal sync.WaitGroup that allows us to properly stop// and wait for all the runnables to finish before returning.wg *sync.WaitGroup
}

我们看一下Add方法,印证了我们之前的猜测

func (r *runnables) Add(fn Runnable) error {switch runnable := fn.(type) {case *server:return r.HTTPServers.Add(fn, nil)case hasCache:return r.Caches.Add(fn, func(ctx context.Context) bool {return runnable.GetCache().WaitForCacheSync(ctx)})case webhook.Server:return r.Webhooks.Add(fn, nil)case LeaderElectionRunnable:if !runnable.NeedLeaderElection() {return r.Others.Add(fn, nil)}return r.LeaderElection.Add(fn, nil)default:return r.LeaderElection.Add(fn, nil)}
}

cluster属于hasCache的实现,我们看看他做了些什么事情。把runnable和runnale启动检验的逻辑包装到readyRunnable这个实体,然后做了两件事情:

  • 如果没有启动,把runnable放到startQueue这个队列中
  • 如果启动了,把runnable放到 r.ch这个channel通道中

接下来就是启动的步骤,我们看看start中做了些什么事情。

func (r *runnableGroup) Start(ctx context.Context) error {var retErr errorr.startOnce.Do(func() {defer close(r.startReadyCh)// Start the internal reconciler.// 启动reconcile,这里启动所有的自定义逻辑go r.reconcile()// Start the group and queue up all// the runnables that were added prior.r.start.Lock()r.started = true//还记得上文Add逻辑吗,这里是将startQueue中的readyRunnable实体塞到ch这个channel中for _, rn := range r.startQueue {rn.signalReady = truer.ch <- rn}r.start.Unlock()// If we don't have any queue, return.if len(r.startQueue) == 0 {return}// Wait for all runnables to signal.// 判断是否所有的runnables都已经启动for {select {case <-ctx.Done():if err := ctx.Err(); !errors.Is(err, context.Canceled) {retErr = err}// 这里判断readyRunnable是不是已经启动了,如果启动了就从startQueue中删除case rn := <-r.startReadyCh:for i, existing := range r.startQueue {if existing == rn {// Remove the item from the start queue.r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)break}}// We're done waiting if the queue is empty, return.if len(r.startQueue) == 0 {return}}}})return retErr
}

我们回过头来分析一下,r.reconcile()做了些什么事情

func (r *runnableGroup) reconcile() {for runnable := range r.ch {....// Start the runnable.go func(rn *readyRunnable) {go func() {//如果检查执行完毕后if rn.Check(r.ctx) {if rn.signalReady {//传入到startReadyCh的channel中来r.startReadyCh <- rn}}}().....// 执行start函数if err := rn.Start(r.ctx); err != nil {r.errChan <- err}}(runnable)}
}

这里cluster的start函数执行逻辑是

func (c *cluster) Start(ctx context.Context) error {defer c.recorderProvider.Stop(ctx)return c.cache.Start(ctx)
}

我们找到这个start的具体实现逻辑,追踪下去,我们就见到了核心的informer.Run(ip.ctx.Done()),这个就是原生的informer的用法。

2.4.2 自定义的的reconciler的启动

如果还记得SetUpWithManager的逻辑,我们知道自定义的reconciler被包装成了Controller实体放到了cm.runnables.LeaderElection这个分组中了,如果,我们继续追踪Start方法内的代码逻辑.

{ctx, cancel := context.WithCancel(context.Background())cm.leaderElectionCancel = cancelgo func() {// 如果没有抢占资源锁,就继续等待if cm.resourceLock != nil {if err := cm.startLeaderElection(ctx); err != nil {cm.errChan <- err}} else {// Treat not having leader election enabled the same as being elected.// 如果选主成功,执行核心启动逻辑if err := cm.startLeaderElectionRunnables(); err != nil {cm.errChan <- err}close(cm.elected)}}()
}

最后一步了,cm.startLeaderElectionRunnables(),我们追踪一下核心启动逻辑,与cluster是不是异曲同工。

func (r *runnableGroup) Start(ctx context.Context) error {var retErr errorr.startOnce.Do(func() {defer close(r.startReadyCh)// Start the internal reconciler.go r.reconcile()// Start the group and queue up all// the runnables that were added prior.r.start.Lock()r.started = truefor _, rn := range r.startQueue {rn.signalReady = truer.ch <- rn}r.start.Unlock()// If we don't have any queue, return.if len(r.startQueue) == 0 {return}// Wait for all runnables to signal.for {select {case <-ctx.Done():if err := ctx.Err(); !errors.Is(err, context.Canceled) {retErr = err}case rn := <-r.startReadyCh:for i, existing := range r.startQueue {if existing == rn {// Remove the item from the start queue.r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)break}}// We're done waiting if the queue is empty, return.if len(r.startQueue) == 0 {return}}}})return retErr
}

我们深入看一下关键是Controller的start方法是什么。

func (c *Controller) Start(ctx context.Context) error {.....c.Queue = c.MakeQueue()......err := func() error {defer c.mu.Unlock()// TODO(pwittrock): Reconsider HandleCrashdefer utilruntime.HandleCrash()// NB(directxman12): launch the sources *before* trying to wait for the// caches to sync so that they have a chance to register their intendeded// caches.for _, watch := range c.startWatches {c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))// 这个start是不是很眼熟,就是上文分析的Kind的start方法,主要从informer中监听事件放到queue中if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {return err}}......wg.Add(c.MaxConcurrentReconciles)for i := 0; i < c.MaxConcurrentReconciles; i++ {go func() {defer wg.Done()// Run a worker thread that just dequeues items, processes them, and marks them done.// It enforces that the reconcileHandler is never invoked concurrently with the same object.for c.processNextWorkItem(ctx) {}}()}....}()if err != nil {return err}.......
}

我们之前之说生产者是怎么往controller的队列中添加数据的,而没有说队列中的事件是怎么消费数据的,核心逻辑就在processNextWorkItem这个方法中,

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {.......// 这里的c.Reconcile就是controllerresult, err := c.Reconcile(ctx, req)switch {case err != nil:if errors.Is(err, reconcile.TerminalError(nil)) {ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()} else {c.Queue.AddRateLimited(req)}ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()if !result.IsZero() {log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler")}log.Error(err, "Reconciler error")case result.RequeueAfter > 0:log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))// The result.RequeueAfter request will be lost, if it is returned// along with a non-nil error. But this is intended as// We need to drive to stable reconcile loops before queuing due// to result.RequestAfterc.Queue.Forget(obj)c.Queue.AddAfter(req, result.RequeueAfter)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()case result.Requeue:log.V(5).Info("Reconcile done, requeueing")c.Queue.AddRateLimited(req)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()default:log.V(5).Info("Reconcile successful")// Finally, if no error occurs we Forget this item so it does not// get queued again until another change happens.c.Queue.Forget(obj)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()}
}

我们先分析c.Reconcile做了些什么事情,主要是调用了controller中的Do属性的Reconcile方法,而Do就是我们自定义的Reconcile实体属性。

func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
......return c.Do.Reconcile(ctx, req)
}

现在回过头来分析c.reconcileHandler(ctx, obj).

switch {// 如果上一步执行有异常报错,1)忽略,添加通知 2)重新入队
case err != nil:if errors.Is(err, reconcile.TerminalError(nil)) {ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()} else {c.Queue.AddRateLimited(req)}.......// 如果返回的result有RequeueAfter这个属性字段,选择在一段时间后延迟入队
case result.RequeueAfter > 0:log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))// The result.RequeueAfter request will be lost, if it is returned// along with a non-nil error. But this is intended as// We need to drive to stable reconcile loops before queuing due// to result.RequestAfterc.Queue.Forget(obj)c.Queue.AddAfter(req, result.RequeueAfter)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
case result.Requeue:// 如果返回的result,直接入队log.V(5).Info("Reconcile done, requeueing")c.Queue.AddRateLimited(req)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
default:// 默认情况下,从队列中移除log.V(5).Info("Reconcile successful")// Finally, if no error occurs we Forget this item so it does not// get queued again until another change happens.c.Queue.Forget(obj)ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
}

至此,controller-runtime的源码分析完毕.

遗留

workqueue.RateLimitingInterface这个队列的分析没写

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/443280.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Matplotlib】科研绘图——折线图

文章目录 1、导入2、定义Font及Style3、设置图像大小及坐标刻度4、数据准备5、自定义draw6、其他设置7、效果图 1、导入 import matplotlib import matplotlib.pyplot as plt from matplotlib.backends.backend_pdf import PdfPages import numpy as np import pandas as pd %…

【Tomcat与网络8】从源码看Tomcat的层次结构

在前面我们介绍了如何通过源码来启动Tomcat&#xff0c;本文我们就来看一下Tomcat是如何一步步启动的&#xff0c;以及在启动过程中&#xff0c;不同的组件是如何加载的。 一般&#xff0c;我们可以通过 Tomcat 的 /bin 目录下的脚本 startup.sh 来启动 Tomcat&#xff0c;如果…

故障诊断 | 一文解决,CNN卷积神经网络故障诊断(Matlab)

文章目录 效果一览文章概述专栏介绍源码设计参考资料效果一览 文章概述 故障诊断 | 一文解决,CNN卷积神经网络故障诊断(Matlab) 专栏介绍 订阅【故障诊断】专栏,不定期更新机器学习和深度学习在故障诊断中的应用;订阅

微信开放平台第三方授权(第四篇)-wechat发送客服消息

1.发送客服消息 上一张介绍了发送消息需要用到的authorizer_access_token,和发送消息的接口结合使用&#xff0c;上面直接上代码。 重写WechatMpService 获取token&#xff0c;这个发消息会用到 package com.test.wechat.service;import com.test.wechat.config.WechatMpConf…

【百度Apollo】探索创新之路:深入了解Apollo开放平台

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《linux深造日志》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! ⛳️ 推荐 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下…

VueFire:一个一流的 Vue 和 Firebase 体验,包括对 Nuxt 的支持,现在已经稳定了

VueFire&#xff0c;一个一流的 Vue 和 Firebase 体验 — 包括对 Nuxt 的支持&#xff0c;现在已经稳定了。 Vue 和 Firebase 现在比以往任何时候都更好了。 构建更好的VueFire 去年&#xff0c;我们宣布与 Eduardo San Martin Morote 合作&#xff0c;构建一个成熟的 Vue 和…

【INTEL(ALTERA)】带有浮点单元 (FPU) Nios® V/g 处理器在 英特尔® Cyclone10 GX 设备中执行不正确的浮点运算

说明 由于 英特尔 Quartus Prime Pro Edition 软件版本 23.3 存在一个问题&#xff0c;当使用 Nios V/g 处理器并在 英特尔 Cyclone 10 GX 设备中启用 FPU 时&#xff0c;浮点运算无法按预期进行。 Nios V/g 处理器 – 启用浮点单元 解决方法 请勿在 英特尔 CycloneNios 10 G…

消息中间件之RocketMQ源码分析(三)

RocketMQ中的Consumer启动流程 RocketMQ客户端中有两个独立的消费者实现类分别为DefaultMQPullConsumer和DefaultMQPushConsumer&#xff0c; DefaultMQPullConsumer DefaultMQPullConsumer,该消费者使用时需要用户主动从Broker中Pull消息和消费消息&#xff0c;提交消费位点…

【C/C++】深入理解--函数重载(什么是函数重载?为什么要有函数重载?)

目录 一、前言 二、 函数重载 &#x1f34e;什么是函数重载 &#x1f350;函数重载的条件 &#x1f347;函数重载的注意点 &#x1f349;为什么要有函数重载 &#x1f353;为何C语言不支持函数重载&#xff0c;反倒C可以&#xff1f; &#x1f4a6; Linux环境下演示函数重…

Linux--redhat9创建软件仓库

1.插入光盘&#xff0c;挂载镜像 模拟插入光盘: 点击:虚拟机-可移动设备-CD/DVD 设备状态全选&#xff0c;使用ISO影响文件选择当前版本镜像&#xff0c;点击确认。 2.输入: df -h 可以显示&#xff0c;默认/dev/sr0文件为光盘文件&#xff0c;挂载点为/run/media/root/镜像…

【数据结构 03】循环队列

一、原理 循环队列从功能角度具有队列的性质&#xff0c;即遵从先进先出原则&#xff0c;但是其存储方式是顺序存储。 循环队列的存储空间大小通常都是固定的&#xff0c;通过前指针和尾指针的移动控制循环队列数据的增删。 特征&#xff1a;顺序存储、先进先出、容量有限&a…

CISAW和CISP-PTE证书选择指南

&#x1f4e3;在信息安全领域&#xff0c;选择合适的证书可以为你的职业生涯增添光彩。很多从事信息渗透行业的朋友经常讨论CISP-PTE和CISAW之间的选择问题。今天就从4个方面带你详细了解这两张证书&#xff0c;帮你做出明智的选择&#xff01; 1️⃣证书的行业前景 &#x1f4…