Kubernetes:kubelet 源码分析之 pod 创建流程

news/2024/11/16 1:19:57/文章来源:https://www.cnblogs.com/xingzheanan/p/18202067

0. 前言

kubelet 是运行在 Kubernetes 节点上的“节点代理”,用来管理节点。

image

kubelet 主要负责所在节点上的资源对象的管理,例如 Pod 资源对象的创建,删除,监控,驱逐及生命周期管理等。

1. kubelet 源码分析

1.1 kubelet 模块

kubelet 包括的模块如下图:

image

从图中可以看出,kubelet 的模块众多,每个模块负责不同的功能。本文将围绕创建 Pod 流程有取舍的介绍 kubelet 各个模块。

在开始流程介绍前,让我们通过 kubelet 工作原理图将各个模块串联起来,这对于我们的源码分析是相当有帮助的。

image

1.2 kubelet 启动及调试

下载 Kubernetes 源码,配置调试参数:

{"version": "0.2.0","configurations": [{"name": "Kubelet","type": "go","request": "launch","mode": "auto","program": "${fileDirname}","args": ["--container-runtime-endpoint=/run/k3s/containerd/containerd.sock","-v=5","--port=10251","--kubeconfig=/root/.kube/config",]}]
}

打断点进入 kubelet:

image

kubelet 使用 Cobra 作为应用命令行框架,和 kube-schedulerkube-apiserver 初始化过程类似,其流程如下:

image

这里,简要给出初始化示例代码:

// kubernetes/cmd/kubelet/app/server.go
func NewKubeletCommand() *cobra.Command {// 解析 flagscleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)kubeletFlags := options.NewKubeletFlags()// 获取 kubelet 配置kubeletConfig, err := options.NewKubeletConfiguration()cmd := &cobra.Command{...RunE: func(cmd *cobra.Command, args []string) error {...// 构建 kubeletServerkubeletServer := &options.KubeletServer{KubeletFlags:         *kubeletFlags,KubeletConfiguration: *kubeletConfig,}// 构建 kubeletDeps,kubeletDeps 是运行 kubelet 需要的依赖项kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)...return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)}}
}

进入 Run 函数运行 kubelet

// kubernetes/cmd/kubelet/app/server.go
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {...if err := run(ctx, s, kubeDeps, featureGate); err != nil {return fmt.Errorf("failed to run Kubelet: %w", err)}return nil
}func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {...if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {return err}...
}

run 函数的内容比较多,我们直接忽略,有重点的看 RunKubelet

// kubernetes/cmd/kubelet/app/server.go
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {...k, err := createAndInitKubelet(kubeServer,kubeDeps,hostname,hostnameOverridden,nodeName,nodeIPs)if err != nil {return fmt.Errorf("failed to create kubelet: %w", err)}...if runOnce {...} else {startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)klog.InfoS("Started kubelet")}return nil
}

这里 createAndInitKubelet 创建 kubelet 对象,该对象在 startKubelet 中运行:

// kubernetes/cmd/kubelet/app/server.go
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {// start the kubeletgo k.Run(podCfg.Updates())// start the kubelet serverif enableServer {go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)}if kubeCfg.ReadOnlyPort > 0 {go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))}go k.ListenAndServePodResources()
}

startKubelet 调用 kubelet.Run 方法运行 kubelet。我们直接进入 kubelet.Run 方法看其中做了什么。

// kubernetes/pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {...// 初始化模块是初始化不依赖于 container runtime 的模块if err := kl.initializeModules(); err != nil {kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())klog.ErrorS(err, "Failed to initialize internal modules")os.Exit(1)}...kl.syncLoop(ctx, updates, kl)
}

Kubelet.Run 中包括了不少操作,这里还是抓重点看 Kubelet.syncLoop 主逻辑做了什么。

// kubernetes/pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {klog.InfoS("Starting kubelet main sync loop")// syncTicker 每秒检测一次是否有需要同步的 pod workerssyncTicker := time.NewTicker(time.Second)defer syncTicker.Stop()// 每两秒检测一次是否有需要清理的 podhousekeepingTicker := time.NewTicker(housekeepingPeriod)defer housekeepingTicker.Stop()...for {...kl.syncLoopMonitor.Store(kl.clock.Now())if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break}kl.syncLoopMonitor.Store(kl.clock.Now())}
}func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {select {case u, open := <-configCh:...switch u.Op {case kubetypes.ADD:klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))// After restarting, kubelet will get all existing pods through// ADD as if they are new pods. These pods will then go through the// admission process and *may* be rejected. This can be resolved// once we have checkpointing.handler.HandlePodAdditions(u.Pods)...}}
}

Kubelet.syncLoopIteration 包括多个操作管道的行为,这里仅以 configCh 管道为例,看创建 pod 的行为。

handler.HandlePodAdditions(u.Pods) 这里打断点,然后创建 pod:

# helm install test .
NAME: test
LAST DEPLOYED: Sun May 19 15:34:54 2024
NAMESPACE: default
STATUS: deployed

image

I0519 15:34:54.577769 1801325 kubelet.go:2410] "SyncLoop ADD" source="api" pods=["default/test-6d47479b6b-pphb2"]

进入 handler.HandlePodAdditions

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {...for _, pod := range pods {// 获取 podManager 模块中记录的 podsexistingPods := kl.podManager.GetPods()// 更新 podManager 中的 podkl.podManager.AddPod(pod)// 根据 pod 的属性判断当前 pod 是不是 mirrorPod// mirrorPod 是仅受 kubelet 管理的,对 kubernetes 不可见的 podpod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)if wasMirror {...}// 判断 pod 是否处于 termination 状态if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {activePods := kl.filterOutInactivePods(existingPods)if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {...} else {// 判断 pod 是否可以运行在当前 nodeif ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {kl.rejectPod(pod, reason, message)continue}}}kl.podWorkers.UpdatePod(UpdatePodOptions{Pod:        pod,MirrorPod:  mirrorPod,UpdateType: kubetypes.SyncPodCreate,StartTime:  start,})}
}

这里,podManager 模块负责存储和访问 pod 的信息,维持 static pod 和 mirror pods 的关系,podManager 会被 statusManager/volumeManager/runtimeManager 调用,podManger 记录所有被管理的 pod。

继续往下看 podWorkers.UpdatePod

# kubernetes/pkg/kubelet/pod_workers.go
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {...status, ok := p.podSyncStatuses[uid]if !ok {klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)firstTime = truestatus = &podSyncStatus{syncedAt: now,fullname: kubecontainer.BuildPodFullName(name, ns),}...p.podSyncStatuses[uid] = status}...// 创建一个 pod worker 协程,如果该协程不存在的话podUpdates, exists := p.podUpdates[uid]if !exists {podUpdates = make(chan struct{}, 1)p.podUpdates[uid] = podUpdates...go func() {defer runtime.HandleCrash()defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)p.podWorkerLoop(uid, outCh)}()}
}func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {var lastSyncTime time.Timefor range podUpdates {// startPodSync 判断 pod 是否可以被同步ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)...err := func() error {var status *kubecontainer.PodStatusvar err errorswitch {case update.Options.RunningPod != nil:default:status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime)...}}switch {case update.WorkType == TerminatedPod:...default:isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)}lastSyncTime = p.clock.Now()return err}()...}
}

这里,要注意的是 podWorkers.podCache.GetNewerThan 获取的是最新的 pod 状态。其中,PLEG 获取 container runtime 的 pod 状态,存入 podCache 中。podCache 中的 pod 状态和 kubeletkube-apiserver 获取的 pod 状态做对比,以获取最新的 pod 状态。

接着,进入 podWorkers.podSyncer.SyncPod 同步 pod:

func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {...klog.V(4).InfoS("SyncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)...// 生成 apiPodStatus 以同步至 statusManagerapiPodStatus := kl.generateAPIPodStatus(pod, podStatus, false)...// 获取 statusManager 中存储的 pod 状态existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)...// 调用 statusManager 同步 pod 状态kl.statusManager.SetPodStatus(pod, apiPodStatus)...// ensure the kubelet knows about referenced secrets or configmaps used by the podif !kl.podWorkers.IsPodTerminationRequested(pod.UID) {if kl.secretManager != nil {kl.secretManager.RegisterPod(pod)}if kl.configMapManager != nil {kl.configMapManager.RegisterPod(pod)}}// 创建 pod container managerpcm := kl.containerManager.NewPodContainerManager()...// Make data directories for the podif err := kl.makePodDataDirs(pod); err != nil {...}// Wait for volumes to attach/mountif err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {...}// Fetch the pull secrets for the podpullSecrets := kl.getPullSecretsForPod(pod)// Ensure the pod is being probedkl.probeManager.AddPod(pod)...result := kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.backOff)...
}

Kubelet.SyncPod 首先更新 statusManager 中 pod 的状态信息,接着开始创建 pod 所需要的资源,如 data directoriesvolumessecrets。在调用 container runtime 同步 pod 前,将 pod 添加到 probeManger 模块,以检测 pod 状态。这里关于 probeManger 模块的详细内容可参考。

进入 Kubelet.containerRuntime.SyncPod 查看 container runtime 是怎么同步 pod 的。

// kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {// Step 1: Compute sandbox and container changes.podContainerChanges := m.computePodActions(ctx, pod, podStatus)...// Step 2: Kill the pod if the sandbox has changed.if podContainerChanges.KillPod {...} else {// Step 3: kill any running containers in this pod which are not to keep.for containerID, containerInfo := range podContainerChanges.ContainersToKill {...}}...// Step 4: Create a sandbox for the pod if necessary.podSandboxID := podContainerChanges.SandboxIDif podContainerChanges.CreateSandbox {...createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))result.AddSyncResult(createSandboxResult)...podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)if err != nil {...}// 调用 runtime cri 接口查询创建的 pod sandbox 状态resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false)...}configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)result.AddSyncResult(configPodSandboxResult)start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {...klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))...if msg, err := m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {...}...}// Step 5: start ephemeral containersfor _, idx := range podContainerChanges.EphemeralContainersToStart {start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))}if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {...} else {// Step 6: start init containers.for _, idx := range podContainerChanges.InitContainersToStart {container := &pod.Spec.InitContainers[idx]// Start the next init container.if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {...}// Successfully started the container; clear the entry in the failureklog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))}}// Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResourcesif isInPlacePodVerticalScalingAllowed(pod) {if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {m.doPodResizeAction(pod, podStatus, podContainerChanges, result)}}// Step 8: start containers in podContainerChanges.ContainersToStart.for _, idx := range podContainerChanges.ContainersToStart {start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))}return

Kubelet.containerRuntime.SyncPod 中通过调用 runtime cri 接口创建 pod sandbox 和 container。以创建 pod sandbox 为例,在 kubeGenericRuntimeManager.createPodSandbox 中调用 kubeGenericRuntimeManager.instrumentedRuntimeService.RunPodSandbox 创建 pod sandbox:

func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, pod *v1.Pod, attempt uint32) (string, string, error) {podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)if err != nil {...}// 创建 pod 的 log 目录err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)...podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)if err != nil {...}return podSandBoxID, "", nil
}func (in instrumentedRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {...out, err := in.service.RunPodSandbox(ctx, config, runtimeHandler)...return out, err
}func (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {...klog.V(10).InfoS("[RemoteRuntimeService] RunPodSandbox", "config", config, "runtimeHandler", runtimeHandler, "timeout", timeout)...resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{Config:         config,RuntimeHandler: runtimeHandler,})...
}// kubernetes/vendor/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {out := new(RunPodSandboxResponse)err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/RunPodSandbox", in, out, opts...)if err != nil {return nil, err}return out, nil
}

可以看到,这里通过调用 cri 接口的 /runtime.v1.RuntimeService/RunPodSandbox 创建 pod sandbox,至于创建 container 也是类似,调用 runtime cri 的接口实现创建 pod 的 container。

2. 小结

本文从 kubelet 源码层面介绍了 pod 创建的流程,后续将重点看 runtime 是如何工作的。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/709271.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Amazon Q Developer 实战:从新代码生成到遗留代码优化(上)

本文将探索如何在 Visual Studio Code 这个开发者常用的一种集成编程环境(IDE)中,使用 Amazon Q Developer 列出指定区域的 Amazon S3 存储桶的示例代码实现。我们将从在 Amazon Q Developer Agent 的协助下,从生成新代码开始,到将生成的新代码与现有的低效“遗留”旧代码…

那些逃离北上广的程序员们,后来都怎么样了?| 编码人声

「编码人声」是由「RTE开发者社区」策划的一档播客节目,关注行业发展变革、开发者职涯发展、技术突破以及创业创新,由开发者来分享开发者眼中的工作与生活。近年来,许多开发者朋友纷纷「逃离」了一线城市,选择来到成本更低、生活节奏更舒适的地方,成为独立开发者。那么,这…

降水强度计算公式

前面已经计算出了数浓度,下面我们来计算降水强度 降水强度公式如下:

ospf--vlink

在区域2要穿过的区域上配置虚链路;该实验中区域2要穿过区域1与骨干区域通信; 配置过程:R1:interface GigabitEthernet0/0/1ip address 10.1.14.1 255.255.255.0ospf 1area 0.0.0.0network 10.1.14.0 0.0.0.255 R2:interface GigabitEthernet0/0/0ip address 10.1.24.2 25…

给你的博客加上个Live2D看板娘吧

前段时间,在不少人博客看到这个 Live2D 看板娘,颇感兴趣!就查阅了点相关教程为自个博客也添加上了Tips:当你看到这个提示的时候,说明当前的文章是由原emlog博客系统搬迁至此的,文章发布时间已过于久远,编排和内容不一定完整,还请谅解` 给你的博客加上个Live2D看板娘吧 …

笔记:Sublime Text3配置

Sublime Text3的配置信息,省的下次又搞丢找不着了。Tips:当你看到这个提示的时候,说明当前的文章是由原emlog博客系统搬迁至此的,文章发布时间已过于久远,编排和内容不一定完整,还请谅解` 笔记:Sublime Text3配置 日期:2017-12-28 阿珏 谈天说地 浏览:1558次 评论:…

.htaccess伪静态规则

Apache的 mod_rewrite是比较强大的,在进行网站建设时,可以通过这个模块来实现伪静态。Tips:当你看到这个提示的时候,说明当前的文章是由原emlog博客系统搬迁至此的,文章发布时间已过于久远,编排和内容不一定完整,还请谅解` .htaccess伪静态规则 日期:2017-12-4 阿珏 折…

简约博客V1.1版本上线 + 一套新主题

Tips:当你看到这个提示的时候,说明当前的文章是由原emlog博客系统搬迁至此的,文章发布时间已过于久远,编排和内容不一定完整,还请谅解` 简约博客V1.1版本上线 + 一套新主题 日期:2017-11-30 阿珏 折腾代码 浏览:2256次 评论:3条时隔两个月,简约博客系统迎来首次更新…

生产订单下推优化

生产订单下推优化针对生产订单优化措施:装箱时加一个生产订单编号输入框,提交后不会清空 拆装箱也加入生产编号输入框,提交后会自动清空 将旧的生产订单下推汇报单,集成到UDI APP中,统一为1个app。​​

Nexpose v6.6.252 for Linux Windows - 漏洞扫描

Nexpose v6.6.252 for Linux & Windows - 漏洞扫描Nexpose v6.6.252 for Linux & Windows - 漏洞扫描 Rapid7 Vulnerability Management, release May 15, 2024 请访问原文链接:https://sysin.org/blog/nexpose-6/,查看最新版。原创作品,转载请保留出处。 作者主页:…

JMeter+InfluxDB+Grafana性能监控最快搭建方法

一、部署influxdb服务 参考官网:https://www.influxdata.com/downloads/ 安装命令(centos系统) wget https://dl.influxdata.com/influxdb/releases/influxdb-1.8.10.x86_64.rpmsudo yum localinstall influxdb-1.8.10.x86_64.rpm进入配置文件修改配置 vim /etc/influxdb/infl…

C# Body为form-data file文件上传至第三方接口

1.首先,让我们看一下第三方API接口在Postman工具中的展示:请求方式:POST 请求URL:http://192.168.100.246:30011/sino-qc/product/inspect/ocr-name 请求Header:Content-Type: multipart/form-data 请求Body:file(类型为file) 2.现在,让我们编写C#代码来实现文件上传功…