Volcano Scheduler调度器源码解析

Volcano Scheduler调度器源码解析

本文从源码的角度分析Volcano Scheduler相关功能的实现。

本篇Volcano版本为v1.8.0。

Volcano项目地址: https://github.com/volcano-sh/volcano

controller命令main入口: cmd/scheduler/main.go

controller相关代码目录: pkg/scheduler
关联文章:

  • 《Volcano Controller控制器源码解析》

更多文章访问: https://www.cyisme.top

看过我之前文章《从源码解析KubeScheduler调度过程》的朋友应该已经大致了解了k8s原生调度器的运行逻辑了。

k8s scheduler大致运行流程:

listewatch
schedulOne
Informer
SchedulingQueue
schedulingCycle
bindingCycle

k8s scheduler的调度大概是从infomer监听事件到之后,就会对这个事件进行调度,期间会运行framework插件,最终绑定到目标node上。

volcano scheduler的调度和k8s scheduler的流程略有不同, 但是思想差不多。

listewatch
resources snapshot
open ssn
Informer
Cache
Session
Plugin
bind

volcano scheduler调度不是监听到事件后就执行调度, 而是周期性的(open ssn动作是周期性执行的)。 每过一段时间(默认1s)就会执行一次调度,期间会运行plugin插件,最终绑定到目标node上。

在这里插入图片描述

Cache

cache组件会list/watch, 维护最新的资源信息。 注册informer事件的代码这里不做展示。

type SchedulerCache struct {// ...// 一些informer...podInformer                infov1.PodInformernodeInformer               infov1.NodeInformerpodGroupInformerV1beta1    vcinformerv1.PodGroupInformerqueueInformerV1beta1       vcinformerv1.QueueInformerpvInformer                 infov1.PersistentVolumeInformerpvcInformer                infov1.PersistentVolumeClaimInformerscInformer                 storagev1.StorageClassInformerpcInformer                 schedv1.PriorityClassInformerquotaInformer              infov1.ResourceQuotaInformercsiNodeInformer            storagev1.CSINodeInformercsiDriverInformer          storagev1.CSIDriverInformercsiStorageCapacityInformer storagev1beta1.CSIStorageCapacityInformercpuInformer                cpuinformerv1.NumatopologyInformer// 用于绑定nodeBinder         Binder// 存储list/watch到的资源Jobs                 map[schedulingapi.JobID]*schedulingapi.JobInfoNodes                map[string]*schedulingapi.NodeInfoQueues               map[schedulingapi.QueueID]*schedulingapi.QueueInfoPriorityClasses      map[string]*schedulingv1.PriorityClassNodeList             []stringdefaultPriorityClass *schedulingv1.PriorityClassdefaultPriority      int32CSINodesStatus       map[string]*schedulingapi.CSINodeStatusInfo// ...
}

cache启动时,我们关注三个goroutine的工作职责。

func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {sc.informerFactory.Start(stopCh)sc.vcInformerFactory.Start(stopCh)// 处理错误的任务go wait.Until(sc.processResyncTask, 0, stopCh)// 清理jobgo wait.Until(sc.processCleanupJob, 0, stopCh)// 执行绑定操作go wait.Until(sc.processBindTask, time.Millisecond*20, stopCh)// ...
}

processResyncTask会从errTasks队列中, 取出task, 并重新从apiserver中获取资源信息更新它。

func (sc *SchedulerCache) processResyncTask() {obj, shutdown := sc.errTasks.Get()// 省略一些代码// syncTask 会用从apiserver中获取到资源信息更新taskif err := sc.syncTask(task); err != nil {sc.resyncTask(task)}
}

processCleanupJob会从DeletedJobs队列中, 取出job, 如果job已经完成,则删除cache中的job信息。

func (sc *SchedulerCache) processCleanupJob() {obj, shutdown := sc.DeletedJobs.Get()if shutdown {return}// 省略一些代码// // 判断job是否已经完成。如果pg和task都为空,则认为job已经完成if schedulingapi.JobTerminated(job) {// 从cache中删除jobdelete(sc.Jobs, job.UID)} else {// Retrysc.deleteJob(job)}
}

processBindTask会从BindFlowChannelchannel中, 取出数个task, 执行绑定操作。

func (sc *SchedulerCache) processBindTask() {for {select {case taskInfo, ok := <-sc.BindFlowChannel:if !ok {return}sc.bindCache = append(sc.bindCache, taskInfo)// 如果绑定任务达到batchNum,则执行绑定操作// batchNum默认为1if len(sc.bindCache) == sc.batchNum {sc.BindTask()}default:}if len(sc.BindFlowChannel) == 0 {break}}if len(sc.bindCache) == 0 {return}sc.BindTask()
}
func (sc *SchedulerCache) BindTask() {// 拷贝一份绑定任务var tmpBindCache []*schedulingapi.TaskInfo = make([]*schedulingapi.TaskInfo, len(sc.bindCache))copy(tmpBindCache, sc.bindCache)// 异步执行绑定操作go func(tasks []*schedulingapi.TaskInfo) {successfulTasks := make([]*schedulingapi.TaskInfo, 0)for _, task := range tasks {// 检查voluem是否已经准备好, 过滤出可以绑定的taskif err := sc.VolumeBinder.BindVolumes(task, task.PodVolumes); err != nil {sc.VolumeBinder.RevertVolumes(task, task.PodVolumes)// 如果volume没有准备好,放到errTasks队列中,等待更新信息sc.resyncTask(task)} else {successfulTasks = append(successfulTasks, task)}}bindTasks := make([]*schedulingapi.TaskInfo, len(successfulTasks))copy(bindTasks, successfulTasks)// 执行绑定操作, 调用k8s apiif err := sc.Bind(bindTasks); err != nil {return}}(tmpBindCache)// 清空列表sc.bindCache = sc.bindCache[0:0]
}

Framework

framework中的主要对象其实是session, 一个调度周期会开启一个session, 调度工作将会在这个session中进行。

如果参照k8s schedulersession其实对应的就是k8s framework

session是插件的载体与管理者, 由Action触发。

session会存储当前的资源信息, 以及插件的注册信息。

type Session struct {// 省略一些代码// 用于存储cache中的资源信息, 这些信息是深拷贝的Jobs           map[api.JobID]*api.JobInfoNodes          map[string]*api.NodeInfoCSINodesStatus map[string]*api.CSINodeStatusInfoRevocableNodes map[string]*api.NodeInfoQueues         map[api.QueueID]*api.QueueInfoNamespaceInfo  map[api.NamespaceName]*api.NamespaceInfo// 存储启用的插件名称Tiers          []conf.Tier// 存储插件对象plugins           map[string]Plugin// 存储事件处理函数eventHandlers     []*EventHandler// 这些func用于判断资源需要调用哪些插件// map[插件名称]具体方法// 插件中会根据自己的需求, 将自己的名称以及对应的方法注册到session中// 这些方法将会在不同的Action中调用jobOrderFns       map[string]api.CompareFnclusterOrderFns   map[string]api.CompareFnjobStarvingFns    map[string]api.ValidateFn// 省略了一些*Fns // ...
}

session的生命周期

OpenSission方法会返回一个新的session对象, 对应一个新的调度周期。

func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Configuration) *Session {ssn := openSession(cache)// 省略一些代码// for _, tier := range tiers {for _, plugin := range tier.Plugins {// scheduler启动时会将每个插件的名称以及对应的构建方法注册到一个pluginBuilders map中// 这里会根据这个map, 获取到插件的构建方法, 并执行构建方法, 返回一个插件对象if pb, found := GetPluginBuilder(plugin.Name); !found {klog.Errorf("Failed to get plugin %s.", plugin.Name)} else {plugin := pb(plugin.Arguments)ssn.plugins[plugin.Name()] = pluginonSessionOpenStart := time.Now()// OnSessionOpen 中会注册上述的 *Fns plugin.OnSessionOpen(ssn)}}}return ssn
}
func openSession(cache cache.Cache) *Session {ssn := &Session{// 省略一些代码}// 从cache中获取资源信息, 并深拷贝到session中snapshot := cache.Snapshot()ssn.Jobs = snapshot.Jobsfor _, job := range ssn.Jobs {// only conditions will be updated periodicallyif job.PodGroup != nil && job.PodGroup.Status.Conditions != nil {ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy()}// 检查job是否可以调度, jobValid目前只有gang插件注册了if vjr := ssn.JobValid(job); vjr != nil {if !vjr.Pass {jc := &scheduling.PodGroupCondition{Type:               scheduling.PodGroupUnschedulableType,Status:             v1.ConditionTrue,LastTransitionTime: metav1.Now(),TransitionID:       string(ssn.UID),Reason:             vjr.Reason,Message:            vjr.Message,}if err := ssn.UpdatePodGroupCondition(job, jc); err != nil {klog.Errorf("Failed to update job condition: %v", err)}}delete(ssn.Jobs, job.UID)}}// 深拷贝其他资源信息ssn.Nodes = snapshot.Nodes// 省略...return ssn
}

对应的CloseSession方法会在调度周期结束时调用, 执行收尾工作,并清理session中数据。

func CloseSession(ssn *Session) {for _, plugin := range ssn.plugins {onSessionCloseStart := time.Now()// 调用插件的OnSessionClose方法, 方法中会有如设置pg状态等收尾工作plugin.OnSessionClose(ssn)metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionClose, metrics.Duration(onSessionCloseStart))}closeSession(ssn)
}
func closeSession(ssn *Session) {// 更新cache中的数据状态ju := newJobUpdater(ssn)ju.UpdateAll()updateQueueStatus(ssn)// 清理session中的数据ssn.Jobs = nil// 省略...
}

statement

statement用于存储这一次“打包”调度的信息, 最终统一提交或取消。

// 存储task的操作信息
type operation struct {// 操作名称, Evict/Pipeline/Allocatename   Operation// 操作的tasktask   *api.TaskInfo// 操作的原因reason string
}
type Statement struct {operations []operationssn        *Session
}

对应三个操作名称, 有三个方法,供plugin调用。以Evict举例:

func (s *Statement) Evict(reclaimee *api.TaskInfo, reason string) error {// 更新session中的job状态if job, found := s.ssn.Jobs[reclaimee.Job]; found {if err := job.UpdateTaskStatus(reclaimee, api.Releasing); err != nil {}} else {}// 更新node中task的状态if node, found := s.ssn.Nodes[reclaimee.NodeName]; found {err := node.UpdateTask(reclaimee)if err != nil {return err}}// 触发session中的事件处理函数for _, eh := range s.ssn.eventHandlers {if eh.DeallocateFunc != nil {eh.DeallocateFunc(&Event{Task: reclaimee,})}}// 加入到待处理列表s.operations = append(s.operations, operation{name:   Evict,task:   reclaimee,reason: reason,})return nil
}
// evict用于提交操作, cache组件会调用api更新。
func (s *Statement) evict(reclaimee *api.TaskInfo, reason string) error {if err := s.ssn.cache.Evict(reclaimee, reason); err != nil {if e := s.unevict(reclaimee); e != nil {klog.Errorf("Faled to unevict task <%v/%v>: %v.", reclaimee.Namespace, reclaimee.Name, e)}return err}return nil
}
// unevict用于撤销操作恢复task状态, 是上面的逆操作
func (s *Statement) unevict(reclaimee *api.TaskInfo) error {// Update status in sessionjob, found := s.ssn.Jobs[reclaimee.Job]if found {if err := job.UpdateTaskStatus(reclaimee, api.Running); err != nil {}} else {}// Update task in node.if node, found := s.ssn.Nodes[reclaimee.NodeName]; found {err := node.UpdateTask(reclaimee)if err != nil {return err}}for _, eh := range s.ssn.eventHandlers {if eh.AllocateFunc != nil {eh.AllocateFunc(&Event{Task: reclaimee,})}}return nil
}

plugin会在最后根据情况判断是否提交或者取消。

// 撤销
func (s *Statement) Discard() {for i := len(s.operations) - 1; i >= 0; i-- {op := s.operations[i]op.task.GenerateLastTxContext()switch op.name {case Evict:err := s.unevict(op.task)case Pipeline:err := s.unpipeline(op.task)case Allocate:err := s.unallocate(op.task)}}
}
// 提交
func (s *Statement) Commit() {klog.V(3).Info("Committing operations ...")for _, op := range s.operations {op.task.ClearLastTxContext()switch op.name {case Evict:err := s.evict(op.task, op.reason)case Pipeline:s.pipeline(op.task)case Allocate:err := s.allocate(op.task)}}
}

Actions

action是触发执行plugin的动作。

如果参照k8s scheduleraction相当于preFilter、Fileter、PostFilter、PreBind、Bind、PostBind等这些动作。

action有6种:

  • Enqueue 调度器的准备阶段, 判断资源是否满足调度条件。一般作为调度的前置条件。 Enqueue 能够防止集群下有大量不能调度的pod,提高了调度器的性能。
  • Allocate 执行调度操作(分配node), 是必不可缺的一步
  • Preempt 抢占资源, 用于处理高优先级调度问题。 可以在同queue或同job中抢占资源。
  • Backfill 回填步骤,处理待调度Pod列表中没有指明资源申请量的Pod调度。 Backfill能够提高集群吞吐量,提高资源利用率。
  • Reclaim 根据队列权重回收队列的资源。
  • Shuffle 根据资源状况重新分配节点

可以在volcano的配置中,指定使用哪些action以及plugin

# kubectl get configmap volcano-scheduler-configmap -nvolcano-system -oyaml
apiVersion: v1
data:
volcano-scheduler.conf: |
actions: "enqueue, allocate, backfill"
tiers:
- plugins:- name: priority- name: gang- name: conformance
- plugins:- name: drf- name: predicates- name: proportion- name: nodeorder- name: binpack
kind: ConfigMap
metadata:
annotations:
creationTimestamp: "2020-08-15T04:01:02Z"
name: volcano-scheduler-configmap
namespace: volcano-system

配置文件中声明的actions顺序即为实际执行的顺序, 在上面的配置文件的声明中,将依次执行enqueue, allocate, backfill三个action

volcano并不会检查action顺序的合理性,action可以任意顺序。

action需要实现framework中定义的Action interface

type Action interface {// 唯一名称Name() string// 初始化动作Initialize()// 执行动作Execute(ssn *Session)// 取消初始化UnInitialize()
}

Execute中会调用session中由plugins注册的方法, 以enqueue动作为例:

func (enqueue *Action) Execute(ssn *framework.Session) {for _, job := range ssn.Jobs {//...}for {// ...// ssn.JobEnqueueable调用plugin方法if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) {ssn.JobEnqueued(job)job.PodGroup.Status.Phase = scheduling.PodGroupInqueuessn.Jobs[job.UID] = job}// Added Queue back until no job in Queue.queues.Push(queue)}
}
// ssn.JobEnqueueable
func (ssn *Session) JobEnqueueable(obj interface{}) bool {var hasFound boolfor _, tier := range ssn.Tiers {for _, plugin := range tier.Plugins {// 未启用enqueue动作则跳过if !isEnabled(plugin.EnabledJobEnqueued) {continue}// 未注册方法则跳过fn, found := ssn.jobEnqueueableFns[plugin.Name]if !found {continue}// 执行res := fn(obj)if res < 0 {return false}if res > 0 {hasFound = true}}// 如果存在enqueue插件, 则代表允许排队, 这个函数中,不再执行下一Tiers的插件if hasFound {return true}}return true
}

Plugins

pluginsvolcano实现调度逻辑的核心, 也是volcano的核心竞争力。

pluginsinit方法中注册自己的初始化函数到pluginBuilders中, 供sessionOpenSession时调用。

func init() {framework.RegisterPluginBuilder(drf.PluginName, drf.New)framework.RegisterPluginBuilder(gang.PluginName, gang.New)// 省略...
}

plugin需要实现framework中定义的Plugin interface

type Plugin interface {// 插件唯一名称Name() string// 在OpenSession时调用OnSessionOpen(ssn *Session)// 在CloseSession时调用OnSessionClose(ssn *Session)
}

OpenSession时会调用pluginOnSessionOpen方法, plugin会在这时向session注册自己的方法。

gang插件中会注册数个方法:

func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {validJobFn := func(obj interface{}) *api.ValidateResult {//...}ssn.AddJobValidFn(gp.Name(), validJobFn)preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) ([]*api.TaskInfo, int){//...}ssn.AddReclaimableFn(gp.Name(), preemptableFn)ssn.AddPreemptableFn(gp.Name(), preemptableFn)//....
}

CloseSession时会调用pluginOnSessionClose方法, plugin会在这时会执行清理、检查等收尾工作。

func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {var unreadyTaskCount int32var unScheduleJobCount intfor _, job := range ssn.Jobs {if !job.Ready() {// ...// 检查未就绪的task数量unreadyTaskCount = job.MinAvailable - schedulableTaskNum()// 更新pg状态jc := &scheduling.PodGroupCondition{Type:               scheduling.PodGroupUnschedulableType,Status:             v1.ConditionTrue,LastTransitionTime: metav1.Now(),TransitionID:       string(ssn.UID),Reason:             v1beta1.NotEnoughResourcesReason,Message:            msg,}if err := ssn.UpdatePodGroupCondition(job, jc); err != nil {klog.Errorf("Failed to update job <%s/%s> condition: %v",job.Namespace, job.Name, err)}// 省略...}
}

运行流程

scheduler.Run会启动整个调度器。

func (pc *Scheduler) Run(stopCh <-chan struct{}) {// 监听配置文件变化pc.loadSchedulerConf()go pc.watchSchedulerConf(stopCh)// 启动cachepc.cache.SetMetricsConf(pc.metricsConf)pc.cache.Run(stopCh)pc.cache.WaitForCacheSync(stopCh)// 启动调度, 每隔一段时间执行一次go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)if options.ServerOpts.EnableCacheDumper {pc.dumper.ListenForSignal(stopCh)}
}

runOnce会每隔一段时间执行一次, 创建一个新的session执行动作。

func (pc *Scheduler) runOnce() {// 周期性调度pc.mutex.Lock()actions := pc.actionsplugins := pc.pluginsconfigurations := pc.configurationspc.mutex.Unlock()// 动态watch配置, 所以每次调度都会重新加载配置conf.EnabledActionMap = make(map[string]bool)for _, action := range actions {conf.EnabledActionMap[action.Name()] = true}// 打开一个sessionssn := framework.OpenSession(pc.cache, plugins, configurations)defer func() {// 关闭sessionframework.CloseSession(ssn)metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime))}()// 执行actionsfor _, action := range actions {actionStartTime := time.Now()// 执行pluginsaction.Execute(ssn)metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))}
}

会按照配置文件中的顺序执行action, action.Execute会调用plugin注册的方法, 对task、job进行处理, 最终会由backfillalloc或者preempt动作中调用对应的方法添加到cacheBindFlowChannel中, 等待绑定。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/334339.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HarmonyOS 整体容器组件(Navigation)

今晚 我们一起来看看 Navigation 我们可以编写代码如下 Entry Component struct Index {build() {Row() {Column() {Navigation() {}.width(100%).height(100%).backgroundColor("#F1F1F1")}.width(100%)}.height(100%)} }Navigation 通常是作为容器被使用 这里 我…

什么是全链路压测?

随着互联网技术的发展和普及&#xff0c;越来越多的互联网公司开始重视性能压测&#xff0c;并将其纳入软件开发和测试的流程中。 阿里巴巴在2014 年双11 大促活动保障背景下提出了全链路压测技术&#xff0c;能更好的保障系统可用性和稳定性。 什么是全链路压测&#xff1f;…

工业异常检测AnomalyGPT-Demo试跑

写在前面&#xff1a;如果你有大的cpu和gpu可以使用&#xff0c;直接根据官方的安装说明就可以&#xff0c;如果没有&#xff0c;可以点进来试着看一下我个人的安装经验。 一、试跑环境 NVIDIA4090显卡24g,cpu内存33G&#xff0c;交换空间8g,操作系统ubuntu22.04(试跑过程cpu…

Uibot (RPA设计软件)培训前期准备指南————课前材料三

(本博客中会有部分课程ppt截屏,如有侵权请及请及时与小北我取得联系~&#xff09; 紧接着小北的前两篇博客&#xff0c;友友们我们即将开展新课的学习~RPA 培训前期准备指南——安装Uibot(RPA设计软件&#xff09;-CSDN博客https://blog.csdn.net/Zhiyilang/article/details/1…

互联网上门洗衣洗鞋小程序开发搭建;

互联网搭建的洗衣洗鞋小程序&#xff0c;具备多重功能。首先&#xff0c;用户轻松注册与登录&#xff0c;获取一站式洗涤服务体验。接着&#xff0c;用户可在线提交洗衣、洗鞋订单&#xff0c;并随时查看订单状态和历史记录&#xff0c;全程跟踪无忧。再有&#xff0c;您可便捷…

【Flutter 开发实战】Dart 基础篇:从了解背景开始

想要学会用 Flutter 开发 App&#xff0c;就不可避免的要学习另一门很有意思的编程语言 —— Dart。很多小伙伴可能在学习 Flutter 之前可能都没听说过这门编程语言&#xff0c;我也是一样&#xff0c;还以为 Dart 是为了 Flutter 而诞生的&#xff1b;然而&#xff0c;当我们去…

大创项目推荐 深度学习图像风格迁移

文章目录 0 前言1 VGG网络2 风格迁移3 内容损失4 风格损失5 主代码实现6 迁移模型实现7 效果展示8 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习图像风格迁移 - opencv python 该项目较为新颖&#xff0c;适合作为竞赛课题…

系统概要设计说明书

系统概要设计说明书 1.整体架构 2.功能架构 3.技术架构 4.运行环境设计 5.设计目标 6.接口设计 7.性能设计 8.运行设计 9.出错设计 全文档获取进主页

Linux———head,tail命令详解(狠狠爱住)

目录 head 命令&#xff1a; head 命令基本语法&#xff1a; 常用选项 示例 显示文件的前 10 行&#xff1a; 显示文件的前 5 行&#xff1a; 显示文件的前 100 个字节&#xff1a; 不显示文件名的标题信息&#xff1a; 显示文件名的标题信息&#xff1a; tail 命令&…

vscode使用npm安装element-UI并添加router路由

npm安装vue&#xff0c;添加淘宝镜像-CSDN博客 elementUI安装与配置 安装可以看我上一篇文章 vscode控制台输入指令 npm i element-ui -S 安装完成后在目录结构打开下图文件 可以看到多了一行elementui就代表安装成功了 下面是项目常用的结构 安装完成后需要启用elementU…

什么是API网关代理?

带有API网关的代理服务显着增强了用户体验和性能。特别是对于那些使用需要频繁创建和轮换代理的工具的人来说&#xff0c;使用 API 可以节省大量时间并提高效率。 了解API API&#xff08;即应用程序编程接口&#xff09;充当服务提供商和用户之间的连接网关。通过 API 连接&a…

R语言频率分布直方图绘制教程

本篇笔记分享R语言绘制直方图的方法&#xff0c;通过多种展示风格对数据进行可视化&#xff0c;主要用到ggplot、ggpubr等包。 什么是直方图&#xff1f; 直方图(Histogram)&#xff0c;又称质量分布图&#xff0c;是一种统计报告图&#xff0c;由一系列高度不等的柱子表示数…