深入解析 DolphinScheduler 任务调度、拆分与执行全流程

Apache DolphinScheduler介绍

Apache DolphinScheduler 是一个分布式易扩展的可视化DAG工作流任务调度开源系统。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。

Dag背景知识

摘录了一下Dag的offical定义

A graph is formed by vertices and by edges connecting pairs of vertices, where the
vertices can be any kind of object that is connected in pairs by edges.
In the case of a directed graph, each edge has an orientation, from one vertex to another vertex. A path in a directed graph is a sequence of edges having the property that the ending vertex of each
edge in the sequence is the same as the starting vertex of the next edge in the sequence; a path forms a cycle if the starting vertex of its first edge equals the ending vertex of its last edge.

A directed acyclic graph is a directed graph that has no cycles.[1][2][3]

A vertex v of a directed graph is said to be reachable from another
vertex u when there exists a path that starts at u and ends at v.
As a special case, every vertex is considered to be reachable from itself (by a path with zero edges). If a vertex can reach itself via a nontrivial path (a path with one or more edges), then that path is a cycle, so another way to define directed acyclic graphs is that they are the graphs in which no vertex can reach itself via a nontrivial path.

在offical的定义中,有两对象的集合,集合中的元素是

  1. vertex
    一个实体或者元素,可以是任何抽象的object
  2. edge
    一条有方向直线,包含两个vertex,分别扮演起点和终点
  • Dag约束
  1. 在Dag中,一个edge(a,b)的终点可以作为另一个edge(b,c)的起点,这个链路中所有的vertex都是可到达的, c是从a可达的。
  2. 在Dag中允许vertex不存在于任何一个edge中,这个节点可以从自己到达自己(一个孤岛,不和其他vertex有任何联系)
  3. 如果一个vertex可以从自己到达自己,但是中间经过了其他的vertex,那么这就存在一个环circle
  4. 在Dag中没有环

在DolphinScheduler中表示Dag的数据结构为

public class DAG<Node, NodeInfo, EdgeInfo> {private final ReadWriteLock lock = new ReentrantReadWriteLock();/*** node map, key is node, value is node information*/private final Map<Node, NodeInfo> nodesMap;/*** edge map. key is node of origin;value is Map with key for destination node and value for edge*/private final Map<Node, Map<Node, EdgeInfo>> edgesMap;/*** reversed edge set,key is node of destination, value is Map with key for origin node and value for edge*/private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
}

其中

  • Node表示任务的id
  • NodeInfo表示任务的详细信息
  • EdgeInfo包含任务id和依赖任务id

数仓建设任务和任务依赖

在企业数仓建设中,普遍的做法是进行数据分层(引用https://juejin.cn/post/6969874734355841031)

file

在生产环境,由于分层的需要,业务逻辑分布广泛,数据存储类型多样,这就造成了数仓建设的任务多,任务之间依赖复杂,dag就成了最佳的任务依赖和调度的存储结构。在Dag结构中每个节点表示一个具体的调度任务,任务之间的连线表示依赖关系,针对Dag结构化数据的遍历过程,就是对数仓任务的执行过程。

一个简单的数仓依赖任务关系(数仓建设中会有很多任务依赖关系和更复杂的任务依赖关系)

file

DolphinScheduler系统角色拆分

Apache DolphinScheduler核心角色包括MasterServer和WorkerServer,这遵循模块化设计,master和worker专注于自己本身的角色和任务,模块遵循高内聚低耦合的设计,大大提高了系统的稳定性和可扩展性,同时也有利于并行开发,缩短系统的研发时间,提高系统的健壮性。

MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。

WorkerServer主要负责任务的执行和提供日志服务。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。

DolphinScheduler任务调度流程

参考官网,DolphinScheduler核心任务任务执行流程如下
file

鉴于任务调度的复杂性,一个大的流程可以划分为小的流程,在主线流程之外还附加了支线流程,下面对执行调度流程拆分进行分析一下,这样更容易理解。

file

Command分发流程

处理方式

异步,分布式master server节点。

生产者

api-server将用户的运行工作流http请求封装成command数据,insert到t_ds_command表中
一个启动工作流实例的command样例

{"commandType": "START_PROCESS","processDefinitionCode": 14285512555584,"executorId": 1,"commandParam": "{}","taskDependType": "TASK_POST","failureStrategy": "CONTINUE","warningType": "NONE","startTime": 1723444881372,"processInstancePriority": "MEDIUM","updateTime": 1723444881372,"workerGroup": "default","tenantCode": "default","environmentCode": -1,"dryRun": 0,"processInstanceId": 0,"processDefinitionVersion": 1,"testFlag": 0
}

消费者

master server中的MasterSchedulerBootstrap loop程序, MasterSchedulerBootstrap使用zk分配到自己的slot,从t_ds_command表中select属于slot的command列表处理
查询语句

<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">select *from t_ds_commandwhere id % #{masterCount} = #{thisMasterSlot}order by process_instance_priority, id asclimit #{limit}
</select>

MasterSchedulerBootstrap loop轮训查到待处理的command任务,将command任务和master host生成ProcessInstance,将ProcessInstance对象插入到t_ds_process_instance表中,
同时生成包含运行所需要的上下文信息的可执行任务workflowExecuteRunnable
workflowExecuteRunnablecache到本地cache processInstanceExecCacheManager,同时生产将ProcessInstanceWorkflowEventType.START_WORKFLOW生产到workflowEventQueue队列中。

Dag遍历执行任务

Master本地cache缓冲

cache实现ProcessInstanceExecCacheManagerImpl,提供如下核心功能

public interface ProcessInstanceExecCacheManager {/*** get WorkflowExecuteThread by process instance id** @param processInstanceId processInstanceId* @return WorkflowExecuteThread*/WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId);/*** judge the process instance does it exist** @param processInstanceId processInstanceId* @return true - if process instance id exists in cache*/boolean contains(int processInstanceId);/*** remove cache by process instance id** @param processInstanceId processInstanceId*/void removeByProcessInstanceId(int processInstanceId);/*** cache** @param processInstanceId     processInstanceId* @param workflowExecuteThread if it is null, will not be cached*/void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread);/*** get all WorkflowExecuteThread from cache** @return all WorkflowExecuteThread in cache*/Collection<WorkflowExecuteRunnable> getAll();void clearCache();
}

生产者

MasterSchedulerBootstrap loop将command transform to可以运行的任务,任务对象中包含了要处理的所有上下文信息

消费者

EventExecuteService根据dag信息,拿到第一批没有任何依赖的TaskInstance添加到待执行任务队列standByTaskInstancePriorityQueue中, standByTaskInstancePriorityQueue按照优先级先后顺序执行,处理任务状态,将待执行任务提交到globalTaskDispatchWaitingQueue队列中。

可执行任务Dispatch

Master进城内优先级队列

到了globalTaskDispatchWaitingQueue中,已经是可执行任务的最小单元了

生产者

EventExecuteService根据parent node,对Dag进行广度优先遍历,提交任务到globalTaskDispatchWaitingQueue队列中。

消费者

消费者为GlobalTaskDispatchWaitingQueueLooperGlobalTaskDispatchWaitingQueueLooper消费待dispatch的任务,根据任务类型执行任务调度,对任务的调度是走的rpc接口,目前来看根据任务类型分为两种:

  1. MasterTaskDispatcher
  2. WorkerTaskDispatcher

对于WorkerTaskDispatcher来说,rpc server收到rpc request之后提交任务到了workerTaskExecutorThreadPool执行。所以这是一个异步处理任务的过程,不至于让master server hang在这个地方。对于任务的执行进度,会在关键节点进行回调通知。

任务执行状态回调通知

Worker被dispatch任务,异步提交到线程池中之行,在任务异步执行的节点,调用rpc接口通知master任务的状态。

生产者

Worker异步执行节点,对于任务执行状态回调包括四个

  1. TaskExecutionStatus.FAILURE 执行抛出异常,运行失败
  2. TaskExecutionStatus.RUNNING_EXECUTION 开始执行
  3. TaskExecutionStatus.KILL 被杀死
  4. TaskExecutionStatus.SUCCESS 执行成功

备注:在官方的事件流程中Ack的方向搞错了,Ack不是worker通知给master,而是master通知workerer,我的这个事件状态的处理结束了。

经过校正一下,比较概括性的总结,整体的流程大致如下图

file

消费者

master节点ITaskInstanceExecutionEventListener服务,服务接受rpc请求,并将任务添加到TaskEventService eventQueue队列中。

任务状态处理

缓冲队列

master节点TaskEventService eventQueue队列。

生产者

这个生产者可能会很多

  1. api-server用户行为
  2. master节点任务调度
  3. work节点任务执行
  4. master任务执行

消费者

为master节点的TaskInstanceListenerImpl服务,TaskInstanceListenerImplTaskEvent transform to TaskExecuteRunnable,并且提交到线程池执行taskExecuteThreadMap待执行,在线程池中修改任务的执行状态。

本文由 白鲸开源 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/811091.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

单片机复位详解

单片机复位详解 单片机复位介绍 单片机复位是确保单片机能够稳定、正确地从头开始执行程序的重要机制。复位电路的作用是使单片机的状态处于初始化状态,包括让时钟处于稳定状态、各种寄存器和端口处于初始化状态等。 单片机复位分为高电平复位和低电平复位两种方式。 基本上所…

Camstar 电子套件基础数据导入导出Export/Import

前提准备:你的共享目录CamstarUploads弄好了,参考https://www.cnblogs.com/CarryYou-lky/p/16133849.html 😘宝子:除非不再醒来,除非太阳不再升起,不然都请你好好生活,挣扎着前进,开心的笑。(●◡●)

LearnFast.ai:用AI高效解决数学和物理难题的体验分享

最近,我在使用一款名叫 LearnFast.ai 的工具,帮助我快速解决数学和物理问题。作为一个基于 GPT-4o API 的解题工具,它的实时响应和多格式支持让我在学习中事半功倍。无论是高中生、大学生还是自学者,这款工具都可以成为解决复杂问题的好帮手。摘要:最近,我在使用一款名叫…

2024-9-28

新闻周刊2024.9.28 导入:建立"定点医药机构相干人员"实行驾照式经分 传统监管机构将从医药机构进一步精确到人的进步,让少部分违规人员收到更加严厉的处罚防止医保滥用,让违规者付出应有代价,确保医保资金真正惠民,让所有人都共同收益. 视点:秋收"惠农"时农…

实验作业2

任务1 源代码#include <stdio.h> #include <stdlib.h> #include <time.h>#define N 5 #define N1 397 #define N2 476 #define N3 21int main() {int cnt;int random_major, random_no;srand(time(NULL)); // 以当前系统时间作为随机种子cnt = 0;while(…

# Cocos 2 使用 webview 嵌入页面,摄像头调用没权限问题

Cocos 2 使用 webview 嵌入页面,摄像头调用没权限问题嗯,这么说呢,这篇博文看自己的实际需求哈,标题写的可能不是很准确。 我这边呢,是遇到这样一个功能,就是有一个服务,他是的页面呢,是打开电脑的摄像头,需要在cocos 程序里面呢,展示摄像头的实时画面。看上去挺简单…

Leetcode 864. 获取所有钥匙的最短路径

1.题目基本信息 1.1.题目描述 给定一个二维网格 grid ,其中:‘.’ 代表一个空房间 ‘#’ 代表一堵墙 ‘@’ 是起点 小写字母代表钥匙 大写字母代表锁我们从起点开始出发,一次移动是指向四个基本方向之一行走一个单位空间。我们不能在网格外面行走,也无法穿过一堵墙。如果途…

Hello-Java-Sec 项目 (代码审计)

一、项目背景: Hello-Java-Sec项目为 Github中 一个面向安全开发的 Java漏洞代码审计靶场。 靶场地址:https://github.com/j3ers3/Hello-Java-Sec 本地使用idea部署即可二、代码审计: 通过阅读代码可知,代码采用 @RequestMapping 注解的方式来处理 HTTP不同方法的请求,故…

【原创】微信自动回复工具(下篇)

全文 离第一篇文章已经不知不觉过去3年多了,这段时间有空重新重构了一套消息回传模式,工具介绍官网: → → http://message.fuyue.xyz/ ← ← 视频演示: 观看视频 功能列表 本微信助手工具目前已经实现如下功能:接收微信好友消息 接收微信群组消息 接收系统消息(添加好友…

基于模糊神经网络的移动机器人路径规划matlab仿真

1.程序功能描述基于模糊神经网络的移动机器人路径规划 1.环境地图中的障碍物为静态、未知障碍物,可以随机设置。(一般设置5~7个,为计算简便设置成规则性状的障碍物) 2.机器人的行进方向为X轴的正方向,X轴逆时针旋转90即为Y轴。两驱动轮之间的距离为50cm,驱动轮的直径为30…

实验二

任务一 源代码1 #include <stdio.h>2 #include <time.h>3 4 #define N 55 #define N1 3976 #define N2 4767 #define N3 218 int main(){9 10 int random_major,random_no; 11 int cnt; 12 srand(time(NULL)); 13 14 cnt=0; 15 whil…

记录一次本地安装AI ollama大模型数据对话 的经历

浏览器打开 Ollama官网 下载对应的版本,我这里下载的 是对应 windows的版本,下载后直接运行安装安装完成后 打开 dos控制台,win+r,cmd那个,输入ollama 如果显示如下截图内容,就说明安装成功了,接下来就是下载 具体的 大数据库了 安装大模型前,建议先修改环境变量,因…