kubelet源码学习(一):kubelet工作原理、kubelet启动过程

本文基于Kubernetes v1.22.4版本进行源码学习

1、kubelet工作原理

1)、kubelet核心工作

kubelet的工作核心就是一个控制循环,即:SyncLoop(图中的大圆圈)。而驱动这个控制循环运行的事件,包括:Pod更新事件、Pod生命周期变化、kubelet本身设置的执行周期、定时的清理事件

kubelet还负责维护着很多其他的子控制循环(也就是图中的小圆圈),叫做xxxManager,比如:probeManager会定时去监控Pod中容器的健康状况,当前支持两种类型的探针:livenessProbe和readinessProbe;statusManager负责维护状态信息,并把Pod状态更新到APIServer;containerRefManager是容器引用的管理,用来报告容器的创建、失败等事件

2)、CRI与容器运行时

kubelet调用下层容器运行时的执行过程,并不会直接调用Docker的API,而是通过一组叫作CRI(Container Runtime Interface,容器运行时接口)的gRPC接口来间接执行的

CRI shim负责响应CRI请求,扮演kubelet与容器项目之间的垫片(shim)。CRI shim实现了CRI规定的每个接口,然后把具体的CRI请求翻译成对后端容器项目的请求或者操作

每一种容器项目都可以自己实现一个CRI shim,自行对CRI请求进行处理,这样,Kubernetes就有了一个统一的容器抽象层,使得下层容器运行时可以自由地对接进入Kubernetes当中

如果使用Docker的话,dockershim负责处理CRI请求,然后组装成Docker API请求发给Docker Daemon

CRI接口可以分为两组:

  • RuntimeService:主要是跟容器相关的操作,比如创建、启动、删除容器,执行exec命令等
  • ImageManagerService:主要是容器镜像相关的操作,比如拉取镜像、删除镜像等

CRI接口核心方法如下图:

CRD设计的一个重要原则,就是确保这个接口本身,只关注容器,不关注Pod,在CRI的设计里并没有一个直接创建Pod或者启动Pod的接口

PodSandboxManager中包含RunPodSandbox方法,这个PodSandbox对应的并不是Kubernetes里的Pod API对象,而只是抽取了Pod里的一部分与容器运行时相关的字段,比如HostName、DnsConfig、CgroupParent等。所以说,PodSandbox描述的其实是Kubernetes将Pod这个概念映射到容器运行时层面所需要的字段,或者说是一个Pod对象子集

比如,当执行kubectl run创建了一个名叫foo的、包括了A、B两个容器的Pod之后。如果是Docker项目,dockershim就会创建出一个名叫foo的Infra容器(pause容器)用来hold住整个Pod的Network Namespace

2、kubelet启动过程

pkg/kubelet/kubelet.goRun()方法启动了kubelet各个模块,代码如下:

// pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {if kl.logServer == nil {kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))}if kl.kubeClient == nil {klog.InfoS("No API server defined - no node status update will be sent")}// Start the cloud provider sync managerif kl.cloudResourceSyncManager != nil {go kl.cloudResourceSyncManager.Run(wait.NeverStop)}// 启动不依赖container runtime的一些模块if err := kl.initializeModules(); err != nil {kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())klog.ErrorS(err, "Failed to initialize internal modules")os.Exit(1)}// Start volume manager// 启动volume managergo kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)if kl.kubeClient != nil {// Start syncing node status immediately, this may set up things the runtime needs to run.// 定时同步node状态go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)// 更新pod CIDR、runtime状态以及执行首次node状态同步go kl.fastStatusUpdateOnce()// start syncing lease// 启动node lease机制go kl.nodeLeaseController.Run(wait.NeverStop)}// 定时更新runtime状态go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)// Set up iptables util rulesif kl.makeIPTablesUtilChains {kl.initNetworkUtil()}// Start component sync loops.// 启动statusManagerkl.statusManager.Start()// Start syncing RuntimeClasses if enabled.// 启动runtimeClassManagerif kl.runtimeClassManager != nil {kl.runtimeClassManager.Start(wait.NeverStop)}// Start the pod lifecycle event generator.// 启动pleg,该模块主要用于周期性地向container runtime刷新当前所有容器的状态kl.pleg.Start()// 启动kubelet事件循环,不停监听外部数据的变化执行pod的相应操作kl.syncLoop(updates, kl)
}

Run()方法主要逻辑如下:

  1. 调用kl.initializeModules()方法,启动不依赖container runtime的一些模块
  2. 启动volume manager
  3. 定时同步node状态
  4. 调用kl.fastStatusUpdateOnce()方法,更新pod CIDR、runtime状态以及执行首次node状态同步
  5. 启动node lease机制,同步节点租约
  6. 定时执行kl.updateRuntimeUp()方法,更新runtime状态
  7. 启动statusManager、runtimeClassManager
  8. 调用kl.pleg.Start()方法,启动pleg,该模块主要用于周期性地向container runtime刷新当前所有容器的状态
  9. 调用kl.syncLoop()方法,启动kubelet事件循环,不停监听外部数据的变化执行pod的相应操作
1)、initializeModules()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) initializeModules() error {// Prometheus metrics.metrics.Register(collectors.NewVolumeStatsCollector(kl),collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),)metrics.SetNodeName(kl.nodeName)servermetrics.Register()// Setup filesystem directories.if err := kl.setupDataDirs(); err != nil {return err}// If the container logs directory does not exist, create it.if _, err := os.Stat(ContainerLogsDir); err != nil {if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {return fmt.Errorf("failed to create directory %q: %v", ContainerLogsDir, err)}}// Start the image manager.// 启动imageManagerkl.imageManager.Start()// Start the certificate manager if it was enabled.// 启动certificateManager,证书相关if kl.serverCertificateManager != nil {kl.serverCertificateManager.Start()}// Start out of memory watcher.// 启动oomWatcherif kl.oomWatcher != nil {if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {return fmt.Errorf("failed to start OOM watcher: %w", err)}}// Start resource analyzer// 启动resource analyzer,刷新volume stats到缓存中kl.resourceAnalyzer.Start()return nil
}

initializeModules()方法主要逻辑如下:

  1. 启动imageManager,实际上是realImageGCManager
  2. 启动certificateManager,证书相关
  3. 启动oomWatcher
  4. 启动resource analyzer,刷新volume stats到缓存中

kl.imageManager.Start()方法代码如下:

// pkg/kubelet/images/image_gc_manager.go
func (im *realImageGCManager) Start() {go wait.Until(func() {// Initial detection make detected time "unknown" in the past.var ts time.Timeif im.initialized {ts = time.Now()}// 找出所有的image,并删除不再使用的image_, err := im.detectImages(ts)if err != nil {klog.InfoS("Failed to monitor images", "err", err)} else {im.initialized = true}}, 5*time.Minute, wait.NeverStop)// Start a goroutine periodically updates image cache.// 更新image的缓存go wait.Until(func() {// 调用CRI接口,获取最新的imageimages, err := im.runtime.ListImages()if err != nil {klog.InfoS("Failed to update image list", "err", err)} else {im.imageCache.set(images)}}, 30*time.Second, wait.NeverStop)}

realImageGCManager的Start()方法启动两个协程

  1. 定时调用detectImages()方法,会找出所有正在使用的image,然后删除不再使用的image
  2. 定时获取最新的image,调用imageCache()方法更新image的缓存
2)、fastStatusUpdateOnce()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) fastStatusUpdateOnce() {for {time.Sleep(100 * time.Millisecond)node, err := kl.GetNode()if err != nil {klog.ErrorS(err, "Error getting node")continue}if len(node.Spec.PodCIDRs) != 0 {podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")if _, err := kl.updatePodCIDR(podCIDRs); err != nil {klog.ErrorS(err, "Pod CIDR update failed", "CIDR", podCIDRs)continue}// 更新runtime状态kl.updateRuntimeUp()// node状态同步kl.syncNodeStatus()return}}
}

fastStatusUpdateOnce()方法启动一个循环,尝试立即更新pod CIDR。更新pod CIDR后,会更新runtime状态并同步node状态。该方法在一次成功的node状态同步后直接返回,仅在kubelet启动期间执行

kl.updateRuntimeUp()方法代码如下:

// pkg/kubelet/kubelet.go
// 首次执行的时候会初始化runtime依赖模块,然后更新runtimeState
func (kl *Kubelet) updateRuntimeUp() {kl.updateRuntimeMux.Lock()defer kl.updateRuntimeMux.Unlock()// 获取containerRuntime状态s, err := kl.containerRuntime.Status()if err != nil {klog.ErrorS(err, "Container runtime sanity check failed")return}if s == nil {klog.ErrorS(nil, "Container runtime status is nil")return}// Periodically log the whole runtime status for debugging.klog.V(4).InfoS("Container runtime status", "status", s)networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)// 检查network和runtime是否处于ready状态if networkReady == nil || !networkReady.Status {klog.ErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))} else {// Set nil if the container runtime network is ready.kl.runtimeState.setNetworkState(nil)}// information in RuntimeReady condition will be propagated to NodeReady condition.// 获取运行时状态runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)// If RuntimeReady is not set or is false, report an error.if runtimeReady == nil || !runtimeReady.Status {klog.ErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady))return}kl.runtimeState.setRuntimeState(nil)// 调用kl.initializeRuntimeDependentModules初始化依赖模块kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)kl.runtimeState.setRuntimeSync(kl.clock.Now())
}

updateRuntimeUp()方法会获取containerRuntime状态信息,然后根据返回containerRuntime状态检查网络、runtime是不是已经处于ready状态;接着调用kl.initializeRuntimeDependentModules()方法初始化依赖模块,这里会启动cadvisor、containerManager、evictionManager、containerLogManager、pluginManager、shutdownManager;最后设置runtime同步时间

3)、syncLoop()方法
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {klog.InfoS("Starting kubelet main sync loop")// The syncTicker wakes up kubelet to checks if there are any pod workers// that need to be sync'd. A one-second period is sufficient because the// sync interval is defaulted to 10s.syncTicker := time.NewTicker(time.Second)defer syncTicker.Stop()housekeepingTicker := time.NewTicker(housekeepingPeriod)defer housekeepingTicker.Stop()plegCh := kl.pleg.Watch()const (base   = 100 * time.Millisecondmax    = 5 * time.Secondfactor = 2)duration := base// Responsible for checking limits in resolv.conf// The limits do not have anything to do with individual pods// Since this is called in syncLoop, we don't need to call it anywhere elseif kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {kl.dnsConfigurer.CheckLimitsForResolvConf()}for {if err := kl.runtimeState.runtimeErrors(); err != nil {klog.ErrorS(err, "Skipping pod synchronization")// exponential backofftime.Sleep(duration)duration = time.Duration(math.Min(float64(max), factor*float64(duration)))continue}// reset backoff if we have a successduration = basekl.syncLoopMonitor.Store(kl.clock.Now())// 调用kl.syncLoopIteration方法if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break}kl.syncLoopMonitor.Store(kl.clock.Now())}
}

syncLoop()方法在一个循环中不断的调用syncLoopIteration()方法

关于syncLoopIteration()方法中涉及的channel后面会详细介绍,这里只关注syncLoopIteration()方法中的处理逻辑

1)configCh

// pkg/kubelet/kubelet.go
// 该方法会监听多个channel,当发现任何一个channel有数据就交给handler去处理,在handler中通过调用dispatchWork分发任务
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {case u, open := <-configCh:// 该模块将同时watch 3个不同来源的pod信息的变化(file,http,apiServer)// 一旦某个来源的pod信息发生了变化(创建/更新/删除),这个channel中就会出现被更新的pod信息和更新的具体操作// Update from a config source; dispatch it to the right handler// callback.if !open {klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")return false}switch u.Op {case kubetypes.ADD:klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))// After restarting, kubelet will get all existing pods through// ADD as if they are new pods. These pods will then go through the// admission process and *may* be rejected. This can be resolved// once we have checkpointing.handler.HandlePodAdditions(u.Pods)case kubetypes.UPDATE:klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", format.Pods(u.Pods))handler.HandlePodUpdates(u.Pods)case kubetypes.REMOVE:klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", format.Pods(u.Pods))handler.HandlePodRemoves(u.Pods)case kubetypes.RECONCILE:klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", format.Pods(u.Pods))handler.HandlePodReconcile(u.Pods)case kubetypes.DELETE:klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", format.Pods(u.Pods))// DELETE is treated as a UPDATE because of graceful deletion.handler.HandlePodUpdates(u.Pods)case kubetypes.SET:// TODO: Do we want to support this?klog.ErrorS(nil, "Kubelet does not support snapshot update")default:klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)}kl.sourcesReady.AddSource(u.Source)...}return true
}    

configCh是读取配置事件的管道,该模块将同时watch 3个不同来源的Pod信息的变化(file、http、APIServer),一旦某个来源的Pod信息发生了变化(创建/更新/删除),这个channel中就会出现被更新的Pod信息和更新的具体操作

2)plegCh

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {...case e := <-plegCh:if e.Type == pleg.ContainerStarted {// record the most recent time we observed a container start for this pod.// this lets us selectively invalidate the runtimeCache when processing a delete for this pod// to make sure we don't miss handling graceful termination for containers we reported as having started.kl.lastContainerStartedTime.Add(e.ID, time.Now())}if isSyncPodWorthy(e) {// PLEG event for a pod; sync it.if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)handler.HandlePodSyncs([]*v1.Pod{pod})} else {// If the pod no longer exists, ignore the event.klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)}}if e.Type == pleg.ContainerDied {if containerID, ok := e.Data.(string); ok {kl.cleanUpContainersInPod(e.ID, containerID)}}...}return true
}        

kl.pleg.Start()的时候会每秒钟调用一次relist,根据最新的PodStatus生成PodLiftCycleEvent,然后存入到plegCh中

syncLoop()方法中调用kl.pleg.Watch()获取plegCh,然后传给syncLoopIteration()方法,syncLoopIteration()方法中消费plegCh中的数据,在handler中通过调用dispatchWork分发任务

3)syncCh

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {...case <-syncCh:// Sync pods waiting for syncpodsToSync := kl.getPodsToSync()if len(podsToSync) == 0 {break}klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", format.Pods(podsToSync))// 同步最新保存的pod状态handler.HandlePodSyncs(podsToSync)...}return true
}        

syncCh是由syncLoop()方法里面创建的一个定时任务,每秒钟会向syncCh添加一个数据,这个方法会同步所有等待同步的Pod

4)kl.livenessManager.Updates()kl.readinessManager.Updates()kl.startupManager.Updates()

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {...case update := <-kl.livenessManager.Updates():// 如果探针健康检查失败,需要更新pod的状态if update.Result == proberesults.Failure {handleProbeSync(kl, update, handler, "liveness", "unhealthy")}case update := <-kl.readinessManager.Updates():// 当readiness状态变更时,更新容器和pod的状态ready := update.Result == proberesults.Successkl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)status := ""if ready {status = "ready"}handleProbeSync(kl, update, handler, "readiness", status)case update := <-kl.startupManager.Updates():// 当startup状态变更时,更新容器和pod的状态started := update.Result == proberesults.Successkl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)status := "unhealthy"if started {status = "started"}handleProbeSync(kl, update, handler, "startup", status)...}return true
}       

6)housekeepingCh

// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {...case <-housekeepingCh:if !kl.sourcesReady.AllReady() {// If the sources aren't ready or volume manager has not yet synced the states,// skip housekeeping, as we may accidentally delete pods from unready sources.klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")} else {start := time.Now()klog.V(4).InfoS("SyncLoop (housekeeping)")// 执行一些清理工作,包括终止pod workers、删除不想要的pod,移除volumes、pod目录if err := handler.HandlePodCleanups(); err != nil {klog.ErrorS(err, "Failed cleaning pods")}duration := time.Since(start)if duration > housekeepingWarningDuration {klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())}klog.V(4).InfoS("SyncLoop (housekeeping) end")}}return true
}

housekeepingCh也是由由syncLoop()方法创建的,每两秒钟会触发清理,包括:终止Pod Workers、删除不想要的Pod,移除Volumes、Pod目录等

syncLoopIteration()方法监听如下的channel,根据事件做不同的处理

  • configCh:监听file、HTTP、APIServer的时间更新
  • plegCh:pleg子模块每秒钟调用一次relist,根据最新的PodStatus生成podLiftCycleEvent,然后存入到plegCh中
  • syncCh:定时器管道, 每隔一秒去同步最新保存的Pod状态
  • kl.livenessManager.Updates():如果探针检查失败,需要更新Pod的状态
  • kl.readinessManager.Updates():当readiness状态变更时,更新容器和Pod的状态
  • kl.startupManager.Updates():当startup状态变更时,更新容器和Pod的状态
  • housekeepingCh:每两秒钟会触发Pod清理工作
4)、小结

kubelet启动过程如下图

3、syncLoopIteration()方法中涉及的channel

1)、configCh

configCh相关的代码调用流程如上图,关于syncLoopIteration()方法中configCh的处理逻辑前面已经讲过了,这里来看下kubelet是如何监听APIServer并将Pod信息变化写入configCh的

// pkg/kubelet/kubelet.go
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {manifestURLHeader := make(http.Header)if len(kubeCfg.StaticPodURLHeader) > 0 {for k, v := range kubeCfg.StaticPodURLHeader {for i := range v {manifestURLHeader.Add(k, v[i])}}}// source of all configuration// 初始化config.PodConfigcfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)// 添加三种数据来源,分别是file、http、apiServer// define file config sourceif kubeCfg.StaticPodPath != "" {klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))}// define url config sourceif kubeCfg.StaticPodURL != "" {klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))}if kubeDeps.KubeClient != nil {klog.InfoS("Adding apiserver pod source")config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))}return cfg, nil
}

makePodSourceConfig()方法中先初始化config.PodConfig,然后添加三种数据来源:分别是file、http、APIServer,调用cfg.Channel()方法会创建对应的channel

1)NewSourceApiserver()

这里先来看监听APIServer的部分,NewSourceApiserver()方法代码如下:

// pkg/kubelet/config/apiserver.go
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {// 创建ListWatch,监听当前node的pod变化lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))// The Reflector responsible for watching pods at the apiserver should be run only after// the node sync with the apiserver has completed.klog.InfoS("Waiting for node sync before watching apiserver pods")go func() {for {if nodeHasSynced() {klog.V(4).InfoS("node sync completed")break}time.Sleep(WaitForAPIServerSyncPeriod)klog.V(4).InfoS("node sync has not completed yet")}klog.InfoS("Watching apiserver")// 如果node sync完成,调用newSourceApiserverFromLW方法newSourceApiserverFromLW(lw, updates)}()
}func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {send := func(objs []interface{}) {var pods []*v1.Podfor _, o := range objs {pods = append(pods, o.(*v1.Pod))}// 监听到apiServer当前node的pod信息变化后写入channel,后续listen()方法会监听这个channel接收值updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}}// 调用client-go API来创建reflectorr := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)go r.Run(wait.NeverStop)
}

newSourceApiserverFromLW()方法中调用client-go API来创建Reflector,当监听到APIServer中当前Node的Pod信息变化后写入channel

2)cfg.Channel()

makePodSourceConfig()方法中调用cfg.Channel()方法会创建对应的channel

// pkg/kubelet/config/config.go
// 给每个来源注册一个专用的channel
func (c *PodConfig) Channel(source string) chan<- interface{} {c.sourcesLock.Lock()defer c.sourcesLock.Unlock()c.sources.Insert(source)// 调用c.mux.Channel方法return c.mux.Channel(source)
}
// pkg/util/config/config.go
func (m *Mux) Channel(source string) chan interface{} {if len(source) == 0 {panic("Channel given an empty name")}m.sourceLock.Lock()defer m.sourceLock.Unlock()channel, exists := m.sources[source]if exists {return channel}newChannel := make(chan interface{})m.sources[source] = newChannel// 同时启动goroutine去监听新数据// 这里创建的channel最终会传入newSourceApiserverFromLW中定义的send函数中,当监听到apiServer当前node的pod数据变化后会写入channel// listen函数会一直监听这个channel来接收数据go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)return newChannel
}func (m *Mux) listen(source string, listenChannel <-chan interface{}) {for update := range listenChannel {// 调用Merge方法m.merger.Merge(source, update)}
}

Channel()方法中创建的channel最终会传入newSourceApiserverFromLW()方法中定义的send()函数中,当监听到APIServer当前Node的Pod信息数据变化后会写入channel,这里的listen()方法会一直监听这个channel来接收数据,listen()方法调用Merge()方法处理接收到的数据

// pkg/kubelet/config/config.go
func (s *podStorage) Merge(source string, change interface{}) error {s.updateLock.Lock()defer s.updateLock.Unlock()seenBefore := s.sourcesSeen.Has(source)// 区分pod变更类型adds, updates, deletes, removes, reconciles := s.merge(source, change)firstSet := !seenBefore && s.sourcesSeen.Has(source)// deliver update notificationsswitch s.mode {case PodConfigNotificationIncremental:// 最终将pod变更信息传入configChif len(removes.Pods) > 0 {s.updates <- *removes}if len(adds.Pods) > 0 {s.updates <- *adds}if len(updates.Pods) > 0 {s.updates <- *updates}if len(deletes.Pods) > 0 {s.updates <- *deletes}if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {// Send an empty update when first seeing the source and there are// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that// the source is ready.s.updates <- *adds}// Only add reconcile support here, because kubelet doesn't support Snapshot update now.if len(reconciles.Pods) > 0 {s.updates <- *reconciles}...}return nil
}

Merge()方法中会区分Pod变更类型,最终将Pod变更信息传入configCh,kl.syncLoopIteration()方法中监听configCh,交给handler去处理,在handler中通过调用dispatchWork分发任务

configCh数据写入流程如下图:

在这里插入图片描述

2)、plegCh

初始化pleg并运行代码如下:

// pkg/kubelet/pleg/generic.go
func (g *GenericPLEG) Start() {go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}

Start()方法中启动一个gorounite函数每一秒执行一次relist()方法

// pkg/kubelet/pleg/generic.go
func (g *GenericPLEG) relist() {klog.V(5).InfoS("GenericPLEG: Relisting")if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))}timestamp := g.clock.Now()defer func() {metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))}()// Get all the pods.// 调用runtime获取当前node的所有pod和container信息(最终调用CRI接口)podList, err := g.runtime.GetPods(true)if err != nil {klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")return}g.updateRelistTime(timestamp)pods := kubecontainer.Pods(podList)// update running pod and container countupdateRunningPodAndContainerMetrics(pods)g.podRecords.setCurrent(pods)// Compare the old and the current pods, and generate events.eventsByPodID := map[types.UID][]*PodLifecycleEvent{}for pid := range g.podRecords {oldPod := g.podRecords.getOld(pid)pod := g.podRecords.getCurrent(pid)// Get all containers in the old and the new pod.// 获得pod中的所有containerallContainers := getContainersFromPods(oldPod, pod)for _, container := range allContainers {// 检查container是否有变化,如果有变化,生成podLifecycleEventevents := computeEvents(oldPod, pod, &container.ID)for _, e := range events {updateEvents(eventsByPodID, e)}}}var needsReinspection map[types.UID]*kubecontainer.Podif g.cacheEnabled() {needsReinspection = make(map[types.UID]*kubecontainer.Pod)}// If there are events associated with a pod, we should update the// podCache.// 遍历所有发生的event的podfor pid, events := range eventsByPodID {pod := g.podRecords.getCurrent(pid)if g.cacheEnabled() {// updateCache() will inspect the pod and update the cache. If an// error occurs during the inspection, we want PLEG to retry again// in the next relist. To achieve this, we do not update the// associated podRecord of the pod, so that the change will be// detect again in the next relist.// TODO: If many pods changed during the same relist period,// inspecting the pod and getting the PodStatus to update the cache// serially may take a while. We should be aware of this and// parallelize if needed.if err := g.updateCache(pod, pid); err != nil {// Rely on updateCache calling GetPodStatus to log the actual error.klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))// make sure we try to reinspect the pod during the next relistingneedsReinspection[pid] = podcontinue} else {// this pod was in the list to reinspect and we did so because it had events, so remove it// from the list (we don't want the reinspection code below to inspect it a second time in// this relist execution)delete(g.podsToReinspect, pid)}}// Update the internal storage and send out the events.g.podRecords.update(pid)// Map from containerId to exit code; used as a temporary cache for lookupcontainerExitCode := make(map[string]int)// 遍历这个pod的所有event变化for i := range events {// Filter out events that are not reliable and no other components use yet.if events[i].Type == ContainerChanged {continue}select {// 推送到kubelet的plegCh中case g.eventChannel <- events[i]:default:metrics.PLEGDiscardEvents.Inc()klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")}// Log exit code of containers when they finished in a particular eventif events[i].Type == ContainerDied {// Fill up containerExitCode map for ContainerDied event when first time appearedif len(containerExitCode) == 0 && pod != nil && g.cache != nil {// Get updated podStatusstatus, err := g.cache.Get(pod.ID)if err == nil {for _, containerStatus := range status.ContainerStatuses {containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode}}}if containerID, ok := events[i].Data.(string); ok {if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)}}}}}if g.cacheEnabled() {// reinspect any pods that failed inspection during the previous relistif len(g.podsToReinspect) > 0 {klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")for pid, pod := range g.podsToReinspect {if err := g.updateCache(pod, pid); err != nil {// Rely on updateCache calling GetPodStatus to log the actual error.klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))needsReinspection[pid] = pod}}}// Update the cache timestamp.  This needs to happen *after*// all pods have been properly updated in the cache.g.cache.UpdateTime(timestamp)}// make sure we retain the list of pods that need reinspecting the next time relist is calledg.podsToReinspect = needsReinspection
}

relist()方法中主要逻辑如下:

  1. 调用runtime获取当前Node的所有Pod和Container信息(最终调用CRI接口)
  2. 遍历所有Pod,检查container是否有变化,如果有变化,生成podLifecycleEvent
  3. 遍历所有发生的event的Pod,遍历Pod的所有event变化,推送到kubelet的plegChg中
3)、syncCh

所有的Pod进入syncLoopIteration()方法后,最终会走到managePodLoop()方法中,会将Pod信息添加到workQueue队列里

// pkg/kubelet/pleg/generic.go
func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {// Requeue the last update if the last sync returned error.switch {case syncErr == nil:// No error; requeue at the regular resync interval.p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):// Network is not ready; back off for short period of time and retry as network might be ready soon.p.workQueue.Enqueue(pod.UID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))default:// Error occurred during the sync; back off and then retry.p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))}p.completeWorkQueueNext(pod.UID)
}

syncCh是由syncLoop()方法里面创建的一个定时任务,每秒钟会调用getPodsToSync()方法从workQueue中获取等待同步的Pod进行同步

4)、小结

kubelet核心流程如下图

在这里插入图片描述

参考:

11.深入k8s:kubelet工作原理及其初始化源码分析

45 | 幕后英雄:SIG-Node与CRI

46 | 解读 CRI 与 容器运行时

kubelet启动&&创建Pod源码分析

kubelet源码分析 syncLoopIteration(一) configCh

kubelet源码分析 syncLoopIteration(二) plegCh、syncCh、relist

kubelet源码分析 housekeeping 定时清理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/297517.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

项目管理进阶之序言

背景 可能任何一个程序猿/媛都有一个梦想&#xff0c;立志成为一个技术Leader&#xff0c;带领一个Team&#xff0c;完成一个组织中重要的Project。 有些人天赋异禀&#xff0c;光彩夺目&#xff0c;从小已形成的某些特质&#xff0c;足以让他/她胜任这个领域&#xff0c;我们…

【贪心】买卖股票的最佳时机含手续费

/** 贪心&#xff1a;每次选取更低的价格买入&#xff0c;遇到高于买入的价格就出售(此时不一定是最大收益)。* 使用buy表示买入股票的价格和手续费的和。遍历数组&#xff0c;如果后面的股票价格加上手续费* 小于buy&#xff0c;说明有更低的买入价格更新buy。如…

【c++】string类的使用

目录 一、标准库中的string类 1、简单介绍string类 2、string类的常用接口注意事项 2.1、string类对象的常用构造 2.2、string类对象的容量操作 2.3、string类对象的访问及遍历操作 2.4、string类对象的修改操作 二、string类的模拟实现 一、标准库中的string类 1、简…

【网络安全/CTF】unseping 江苏工匠杯

该题考察序列化反序列化及Linux命令执行相关知识。 题目 <?php highlight_file(__FILE__);class ease{private $method;private $args;function __construct($method, $args) {$this->method $method;$this->args $args;}function __destruct(){if (in_array($thi…

vue3(六)-基础入门之自定义组件与ref通信

一、全局组件 html: <div id"app"><mytemplace></mytemplace> </div>javascript: <script>const { createApp } Vueconst app createApp({})app.component(mytemplace, {template: <div><button>返回</button>…

深度神经网络下的风格迁移模型(C#)

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 这个是C#版本的&#xff0c;这里就只放出代码。VB.Net版本请参看 深度神经网络下的风格迁移模型-CSDN博客 斯坦福大学李飞飞团队的…

面向船舶结构健康监测的数据采集与处理系统(一)系统架构

世界贸易快速发展起始于航海时代&#xff0c;而船舶作为重要的水上交通工具&#xff0c;有 其装载量大&#xff0c;运费低廉等优势。但船舶在运营过程中出现的某些结构处应力值 过大问题往往会给运营部门造成重大的损失&#xff0c;甚至造成大量的人员伤亡和严重 的环境污染…

【教学类-42-02】20231224 X-Y 之间加法题判断题2.0(按2:8比例抽取正确题和错误题)

作品展示&#xff1a; 0-5&#xff1a; 21题&#xff0c;正确21题&#xff0c;错误21题42题 。小于44格子&#xff0c;都写上&#xff0c;哪怕输入2:8&#xff0c;实际也是5:5 0-10 66题&#xff0c;正确66题&#xff0c;错误66题132题 大于44格子&#xff0c;正确66题抽取44*…

C语言中关于指针的理解

#include <stdio.h> int main() {int a11;int *p&a; //因为a是整型的&#xff0c;所以我们定义指针p的时候要和a的类型一样char b;char *pa&b; //同理&#xff0c;b是字符型&#xff0c;所以这里的pa也要用字符型return 0; }因为*p指向的是地址&…

三天吃透Java面试八股文

内容摘自我的学习网站&#xff1a;topjavaer.cn 常见的集合有哪些&#xff1f; Java集合类主要由两个接口Collection和Map派生出来的&#xff0c;Collection有三个子接口&#xff1a;List、Set、Queue。 Java集合框架图如下&#xff1a; List代表了有序可重复集合&#xff0c…

连锁便利店管理系统有什么用

连锁便利店管理系统对于连锁便利店的运营和管理非常有用。以下是一些常见的用途&#xff1a; 1. 库存管理&#xff1a;连锁便利店通常需要管理多个门店的库存&#xff0c;管理系统可以帮助实时掌握各个门店的库存情况&#xff0c;包括商品数量、进货记录、库存调拨等。这样可以…