纯干货 | Dolphinscheduler Master模块源码剖析

此前我们曾用万字长文解释了Apache DolphinScheduler的Worker模块源码,今天,我们再来一起看看Master模块源码的原理。

Master Slot计算

file

核心代码逻辑
org.apache.dolphinscheduler.server.master.registry.MasterSlotManager.SlotChangeListener#notify

public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {List<Server> serverList = masterNodeInfo.values().stream()// TODO 这里其实就是过滤掉buzy的master节点.filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.BUSY)).map(this::convertHeartBeatToServer).collect(Collectors.toList());// TODO 同步master节点syncMasterNodes(serverList);
}

计算 totalSlot和currentSlot

private void syncMasterNodes(List<Server> masterNodes) {slotLock.lock();try {this.masterPriorityQueue.clear();// TODO 这里会把所有的master节点都放入到masterPriorityQueue中,比如说 [192.168.220.1:12345,192.168.220.2:12345]this.masterPriorityQueue.putAll(masterNodes);// TODO 就是获取到本地ip的在队列中的位置int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());// TODO 所有节点数量int tempTotalSlot = masterNodes.size();// TODO 正常情况下不会小于0if (tempCurrentSlot < 0) {totalSlot = 0;currentSlot = 0;log.warn("Current master is not in active master list");} else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) {// TODO 这里其实就是记录的是比如说一共有两个slot,我的slot是0或者1totalSlot = tempTotalSlot;currentSlot = tempCurrentSlot;log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);}} finally {slotLock.unlock();}
}

this.masterPriorityQueue.putAll(masterNodes); 会计算索引

public void putAll(Collection<Server> serverList) {for (Server server : serverList) {this.queue.put(server);}// TODO 这里更新了hostIndexMap,存放的是 <host:port> -> 索引refreshMasterList();
}private void refreshMasterList() {hostIndexMap.clear();Iterator<Server> iterator = queue.iterator();int index = 0;while (iterator.hasNext()) {Server server = iterator.next();String addr = NetUtils.getAddr(server.getHost(), server.getPort());hostIndexMap.put(addr, index);index += 1;}}

Master消费Command生成流程实例

file

command最终的获取逻辑:

比如说两个Master节点 : 
masterCount=2 thisMasterSlot=0  master1
masterCount=2 thisMasterSlot=1  master2command中的数据如下 :
1 master2
2 master1
3 master2
4 master1select *from t_ds_commandwhere id % #{masterCount} = #{thisMasterSlot}order by process_instance_priority, id asclimit #{limit}

有没有感到疑惑,就是如果一个master更新到的最新的,一个没有更新到,怎么办?

比如说,master1节点是这样的
1  master2
2  master1
3  master2
4  master1比如说,master2节点是这样的,是不是发现master2节点都是他的,都可以拉取消费?那就导致重复消费,比如说1这个command
1 master1
2 master1
3 master1
4 master1

org.apache.dolphinscheduler.service.process.ProcessServiceImpl#handleCommand

@Transactional
public @Nullable ProcessInstance handleCommand(String host,Command command) throws CronParseException, CodeGenerateException {// TODO 创建流程实例ProcessInstance processInstance = constructProcessInstance(command, host);// cannot construct process instance, return nullif (processInstance == null) {log.error("scan command, command parameter is error: {}", command);commandService.moveToErrorCommand(command, "process instance is null");return null;}processInstance.setCommandType(command.getCommandType());processInstance.addHistoryCmd(command.getCommandType());processInstance.setTestFlag(command.getTestFlag());// if the processDefinition is serialProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(),processInstance.getProcessDefinitionVersion());// TODO 是否是串行执行if (processDefinition.getExecutionType().typeIsSerial()) {saveSerialProcess(processInstance, processDefinition);if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) {setSubProcessParam(processInstance);triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());deleteCommandWithCheck(command.getId());// todo: this is a bad design to return null here, whether trigger the taskreturn null;}} else {// TODO 并行执行processInstanceDao.upsertProcessInstance(processInstance);}// TODO 这里其实还会向triggerRelation表中插入一条数据,是流程实例和triggerCode的关系triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());// TODO 设置子流程参数setSubProcessParam(processInstance);// TODO 删除commanddeleteCommandWithCheck(command.getId());return processInstance;
}

注意:这个方法是加@Transactional的,所以说创建流程实例和删除Command是在一个事物里面的,如果不同的Master消费到同一个Command。肯定会有一个删除Command失败,这时会抛出一个异常,这样就会让数据库进行回滚。

工作流启动流程

file

DAG切分 & 任务提交

file

Master事件状态流转

file
图连接 : Master事件状态流转

TaskEventService组件中的TaskEventDispatchThread(线程)和TaskEventHandlerThread(线程)解析

file

其实就是Master自己状态(DISPATCH)和Worker汇报上来的状态(RUNNING、UPDATE_PID、RESULT)都会放入到eventQueue,TaskEventDispatchThread(线程)会阻塞的方式进行获取,然后放入到对应的TaskExecuteRunnable中(注意 : 不执行的),只有通过TaskEventHandlerThread(线程)才会使用TaskExecuteThreadPool线程进行TaskExecuteRunnable的提交。

转载自Journey
原文链接:https://segmentfault.com/a/1190000044992842

本文由 白鲸开源 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/908629.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ZKmall模版商城前后端分离秒级响应架构深度解析

在当今的电商领域,用户体验和响应速度已成为决定平台竞争力的关键因素。ZKmall模版商城,作为一款高性能的电商平台解决方案,通过采用前后端分离架构,实现了秒级响应,为用户带来了极致的购物体验。本文将深度解析ZKmall模版商城的前后端分离秒级响应架构,探讨其背后的技术…

UESTC 23-th ACM-ICPC 初赛 Q. 校车

这东西其实蛮像最小斯坦纳树,不过我们通过状压DP的思想来讲一讲这个题。 形式化题意:考虑一共有 \(n\) 个点,\(m\) 条无向边,你需要从图中选出至多 \(K\) 条闭合路径,使得所有 \(w\) 个关键点都被覆盖,使得最长路径最小。 容易发现 \(n \le 500\),因此我们可以通过 Floy…

360度全景环视(AVM Around View Monitor)简介

AVM(Around View Monitor),中文:全景环视系统。 在自动驾驶领域,AVM属于自动泊车系统的一部分,是一种实用性极高、可大幅提升用户体验和驾驶安全性的功能。利用车身四周摄像头,拼接出以车身为中心的360环视画面,直接鸟瞰车身周边情况。支持多种车型,具有4路、6路、或前…

htb Code

先快速扫描 rustscan -a 10.10.11.62PORT STATE SERVICE REASON 22/tcp open ssh syn-ack ttl 63 5000/tcp open upnp syn-ack ttl 63 发现有一个5000端口 nmap扫描 nmap -sC -sV -p 5000 -v -Pn -T4 10.10.11.62发现是一个python的命令执行的环境 接下来进行沙…

敏捷开发的终极形态?2025年必试的5款集成混沌工程的项目管理黑科技

随着企业数字化转型的加速,敏捷开发已从单纯的高效迭代演变为融合混沌工程的“韧性敏捷”模式。混沌工程通过主动注入系统故障,验证系统在异常条件下的稳定性,成为保障敏捷开发持续交付的关键技术。据Gartner预测,到2025年,70%的企业将在敏捷流程中集成混沌工程以增强系统…

SQL SERVER日常运维巡检系列之-实例级参数

前言做好日常巡检是数据库管理和维护的重要步骤,而且需要对每次巡检日期、结果进行登记,同时可能需要出一份巡检报告。本系列旨在解决一些常见的困扰: 不知道巡检哪些东西不知道怎么样便捷体检机器太多体检麻烦生成报告困难,无法直观呈现结果 实例的参数对系统性能和稳定…

How Memory is Implemented in LLM-based Agents?

原文链接:https://medium.com/@parklize/how-memory-is-implemented-in-llm-based-agents-f08e7b6662ff在之前的文章中,我们讨论了大语言模型(LLM)的局限性以及LLM与基于LLM的智能体之间的关系。 智能体为LLM带来的关键增强功能之一是记忆能力,这有助于克服LLM的上下文长度…

运行MBConicHulls教程

为了运行MBConicHulls(a Mathematica package to evaluate N-fold MB integrals)需要安装啥 目录环境Mathematica 13.0.1 Linux版 ✅MultivariateResidues.m (计算多变量余数)✅TOPCOM(一个用于计算三角剖分和相关结构的软件包) ✅前置安装需求步骤-已跑通安装完了测试一下…

信创概念股投资回报率最高的五家公司

信创产业作为近年来备受瞩目的领域,其发展态势对投资者具有极大的吸引力。探寻信创概念股中投资回报率最高的五家公司,对于投资者来说至关重要。这不仅能为他们的资金找到更具潜力的投向,也有助于把握行业发展的脉搏,在复杂多变的市场中获取丰厚的回报。随着信息技术的飞速…

信创国产化背景下人才培养的四大策略

信创国产化是当前我国科技发展的重要战略方向,旨在实现信息技术领域的自主可控,减少对国外技术的依赖,保障国家信息安全。在这一背景下,人才的培养显得尤为关键。信创国产化涉及众多领域和技术,从芯片、操作系统、数据库到各类应用软件,每一个环节都需要专业人才的支撑。…

信创行业政策支持与市场机遇探讨

信创,即信息技术应用创新产业,旨在实现信息技术领域的自主可控,保障国家信息安全。近年来,随着全球政治经济形势的变化以及科技竞争的日益激烈,信创行业迎来了前所未有的发展契机。政策的大力支持为信创行业筑牢了坚实的发展根基,而广阔的市场则为其提供了无限的增长可能…