ACK One Argo工作流:实现动态 Fan-out/Fan-in 任务编排

作者:庄宇

什么是 Fan-out Fan-in

在工作流编排过程中,为了加快大任务处理的效率,可以使用 Fan-out Fan-in 任务编排,将大任务分解成小任务,然后并行运行小任务,最后聚合结果。

图片

由上图,可以使用 DAG(有向无环图)编排 Fan-out Fan-in 任务,子任务的拆分方式分为静态和动态,分别对应静态 DAG 和动态 DAG。动态 DAG Fan-out Fan-in 也可以理解为 MapReduce。每个子任务为 Map,最后聚合结果为 Reduce。

静态 DAG: 拆分的子任务分类是固定的,例如:在数据收集场景中,同时收集数据库 1 和数据库 2 中的数据,最后聚合结果。

动态 DAG: 拆分的子任务分类是动态的,取决于前一个任务的输出结果,例如:在数据处理场景中,任务 A 可以扫描待处理的数据集,为每个子数据集(例如:一个子目录)启动子任务 Bn 处理,当所有子任务 Bn 运行结束后,在子任务 C 中聚合结果,具体启动多少个子任务 B 取决由任务 A 的输出结果。根据实际的业务场景,可以在任务 A 中自定义子任务的拆分规则。

ACK One 分布式工作流 Argo 集群

在实际的业务场景中,为了加快大任务的执行,提升效率,往往需要将一个大任务分解成数千个子任务,为了保证数千个子任务的同时运行,需要调度数万核的 CPU 资源,叠加多任务需要竞争资源,一般 IDC 的离线任务集群难以满足需求。例如:自动驾驶仿真任务,修改算法后的回归测试,需要对所有驾驶场景仿真,每个小驾驶场景的仿真可以由一个子任务运行,开发团队为加快迭代速度,要求所有子场景测试并行执行。

如果您在数据处理,仿真计算和科学计算等场景中,需要使用动态 DAG 的方式编排任务,或者同时需要调度数万核的 CPU 资源加快任务运行,您可以使用阿里云 ACK One 分布式工作流 Argo 集群 [ 1]

ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow [ 2] ,提供售后支持,支持动态 DAG Fan-out Fan-in 任务编排,支持按需调度云上算力,利用云上弹性,调度数万核 CPU 资源并行运行大规模子任务,减少运行时间,运行完成后及时回收资源节省成本。支持数据处理,机器学习,仿真计算,科学计算,CICD 等业务场景。

Argo Workflow 是开源 CNCF 毕业项目,聚焦云原生领域下的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,并使用 Kubernetes Pod 在集群中调度运行。

本文介绍使用 Argo Workflow 编排动态 DAG Fan-out Fan-in 任务。

Argo Workflow 编排 Fan-out Fan-in 任务

我们将构建一个动态 DAG Fan-out Fan-in 工作流,读取阿里云 OSS 对象存储中的一个大日志文件,并将其拆分为多个小文件(split),启动多个子任务分别计算每个小文件中的关键词数量(count),最后聚合结果(merge)。

  1. 创建分布式工作流 Argo 集群 [ 3]

  2. 挂载阿里云 OSS 存储卷,工作流可以像操作本地文件一样,操作阿里云 OSS 上的文件。参考:工作流使用存储卷 [ 4]

  3. 使用以下工作流 YAML 创建一个工作流,参考:创建工作流 [ 5] 。具体说明参见注释。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:generateName: dynamic-dag-map-reduce-
spec:entrypoint: main# claim a OSS PVC, workflow can read/write file in OSS through PVC. volumes:- name: workdirpersistentVolumeClaim:claimName: pvc-oss# how many tasks to split, default is 5.arguments:parameters:- name: numPartsvalue: "5"templates:- name: main# DAG definition.dag:tasks:# split log files to several small files, based on numParts.- name: splittemplate: splitarguments:parameters:- name: numPartsvalue: "{{workflow.parameters.numParts}}"# multiple map task to count words in each small file.- name: maptemplate: maparguments:parameters:- name: partIdvalue: '{{item}}'depends: "split"# run as a loop, partId from split task json outputs.withParam: '{{tasks.split.outputs.result}}'- name: reducetemplate: reducearguments:parameters:- name: numPartsvalue: "{{workflow.parameters.numParts}}"depends: "map"# The `split` task split the big log file to several small files. Each file has a unique ID (partId).# Finally, it dumps a list of partId to stdout as output parameters- name: splitinputs:parameters:- name: numPartscontainer:image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-countcommand: [python]args: ["split.py"]env:- name: NUM_PARTSvalue: "{{inputs.parameters.numParts}}"volumeMounts:- name: workdirmountPath: /mnt/vol# One `map` per partID is started. Finds its own "part file" and processes it.- name: mapinputs:parameters:- name: partIdcontainer:image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-countcommand: [python]args: ["count.py"]env:- name: PART_IDvalue: "{{inputs.parameters.partId}}"volumeMounts:- name: workdirmountPath: /mnt/vol# The `reduce` task takes the "results directory" and returns a single result.- name: reduceinputs:parameters:- name: numPartscontainer:image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-countcommand: [python]args: ["merge.py"]env:- name: NUM_PARTSvalue: "{{inputs.parameters.numParts}}"volumeMounts:- name: workdirmountPath: /mnt/voloutputs:artifacts:- name: resultpath: /mnt/vol/result.json
  1. 动态 DAG 实现

1)split 任务在拆分大文件后,会在标准输出中输出一个 json 字符串,包含:子任务要处理的 partId,例如:

["0", "1", "2", "3", "4"]

2)map 任务使用 withParam 引用 split 任务的输出,并解析 json 字符串获得所有 {{item}},并使用每个 {{item}} 作为输入参数启动多个 map 任务。

          - name: maptemplate: maparguments:parameters:- name: partIdvalue: '{{item}}'depends: "split"withParam: '{{tasks.split.outputs.result}}'

更多定义方式,请参考开源 Argo Workflow 文档 [ 6]

  1. 工作流运行后,通过分布式工作流 Argo 集群控制台 [ 7] 查看任务 DAG 流程与运行结果。

图片

  1. 阿里云 OSS 文件列表,log-count-data.txt 为输入日志文件,split-output,cout-output 中间结果目录,result.json 为最终结果文件。

图片

  1. 示例中的源代码可以参考:AliyunContainerService GitHub argo-workflow-examples [ 8]

总结

Argo Workflow 是开源 CNCF 毕业项目,聚焦云原生领域下的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,并使用 Kubernetes Pod 在集群中调度运行。

阿里云 ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow,提供售后支持,加固控制面实现数万子任务(Pod)稳定高效调度运行,数据面支持无服务器方式调度云上大规模算力,无需运维集群或者节点,支持按需调度云上算力,利用云上弹性,调度数万核 CPU 资源并行运行大规模子任务,减少运行时间,支持数据处理,机器学习,仿真计算,科学计算,CICD 等业务场景。

欢迎加入 ACK One 客户交流钉钉群与我们进行交流。(钉钉群号:35688562

相关链接:

[1] 阿里云 ACK One 分布式工作流 Argo 集群

https://help.aliyun.com/zh/ack/overview-12

[2] Argo Workflow

https://argo-workflows.readthedocs.io/en/latest/

[3] 创建分布式工作流 Argo 集群

https://help.aliyun.com/zh/ack/create-a-workflow-cluster

[4] 工作流使用存储卷

https://help.aliyun.com/zh/ack/use-volumes

[5] 创建工作流

https://help.aliyun.com/zh/ack/create-a-workflow

[6] 开源 Argo Workflow 文档

https://argo-workflows.readthedocs.io/en/latest/walk-through/loops/

[7] 分布式工作流 Argo 集群控制台

https://account.aliyun.com/login/login.htm?oauth_callback=https%3A%2F%2Fcs.console.aliyun.com%2Fone%3Fspm%3Da2c4g.11186623.0.0.7e2f1428OwzMip#/argowf/cluster/detail

[8] AliyunContainerService GitHub argo-workflow-examples

https://github.com/AliyunContainerService/argo-workflow-examples/tree/main/log-count

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/457506.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GeoServer 2.11.1升级解决Eclipse Jetty 的一系列安全漏洞问题

Eclipse Jetty 资源管理错误漏洞(CVE-2021-28165) Eclipse Jetty HTTP请求走私漏洞(CVE-2017-7656) Eclipse Jetty HTTP请求走私漏洞(CVE-2017-7657) Eclipse Jetty HTTP请求走私漏洞(CVE-2017-7658) Jetty 信息泄露漏洞(CVE-2017-9735) Eclipse Jetty 安全漏洞(CVE-2022-20…

C++之std::tuple(一) : 使用精讲(全)

相关系列文章 C之std::tuple(一) : 使用精讲(全) C三剑客之std::variant(一) : 使用 C三剑客之std::variant(二):深入剖析 深入理解可变参数(va_list、std::initializer_list和可变参数模版) std::apply源码分析 目录 1.简介 2.std::ignore介绍 3.创建元组 3.1.…

单片机学习笔记---串口通信(2)

目录 串口内部结构 串口相关寄存器 串口控制寄存器SCON SM0和SM1 SM2 REN TB8和RB8 TI和RI 电源控制寄存器PCON SMOD 串口工作方式 方式0 方式0输出: 方式0输入 方式1 方式1输出。 方式1输入 方式2和方式3 方式2和方式3输出: 方式2和…

【Yi-VL-34B】(5):使用3个3090显卡24G版本,运行Yi-VL-34B模型,支持命令行和web界面方式,理解图片的内容转换成文字

1,视频地址 https://www.bilibili.com/video/BV1BB421z7oA/ 2,关于Yi-VL-34B https://www.modelscope.cn/models/01ai/Yi-VL-34B/summary 易视觉语言(Yi-VL)模型是易大型语言模型(LLM)系列的开源多模态…

【C++】构造函数、初始化列表,析构函数,拷贝构造函数,运算符重载

注:本博客图片来源于学习笔记: 学习笔记https://gitee.com/box-he-he/learning-notes 完整思维导图请前往该博主码云下载。 目录 注:本博客图片来源于学习笔记: 学习笔记https://gitee.com/box-he-he/learning-notes 完整思维导图请前往该博主码云下载…

Maven - 编译报错:程序包 XXX 不存在(多模块项目)

问题描述 编译报错&#xff1a;程序包 XXX 不存在&#xff08;多模块项目&#xff09; 原因分析 检查依赖模块 pom 文件&#xff0c;看是不是引入了如下插件 <plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-pl…

[NOIP2017 提高组] 宝藏

[NOIP2017 提高组] 宝藏 题目背景 NOIP2017 D2T2 题目描述 参与考古挖掘的小明得到了一份藏宝图&#xff0c;藏宝图上标出了 n n n 个深埋在地下的宝藏屋&#xff0c; 也给出了这 n n n 个宝藏屋之间可供开发的 m m m 条道路和它们的长度。 小明决心亲自前往挖掘所有宝…

dolphinscheduler海豚调度(一)简介快速体验

1、简介 Apache DolphinScheduler 是一个分布式易扩展的可视化DAG工作流任务调度开源系统。适用于企业级场景&#xff0c;提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。 Apache DolphinScheduler 旨在解决复杂的大数据任务依赖关系&#xff0c;并为应…

“小手艺”有“大情怀”, 《青春手艺人》赋能乡村振兴,传承新时代文化

文化传承发展要坚持“守正创新”&#xff0c;以守正创新的正气和锐气&#xff0c;赓续历史文脉、谱写当代华章。中央广播电视总台农业农村节目中心推出的聚焦年轻手艺人故事的微纪录片《青春手艺人》&#xff0c;为守正创新的文化传承增添了新的鲜活的青春故事。节目积极响应二…

shell脚本基础语法(.sh ./ sh bash source shell)

Linux 之 Shell 脚本基础语法 0. 学习一门语言的顺序 1. Shell 编程概述 1.1 Shell 名词解释 在 Linux 操作系统中&#xff0c;Shell 是一个命令行解释器&#xff0c;它为用户提供了一个与操作系统内核交互的界面。用户可以通过 Shell 输入命令&#xff0c;然后 Shell 将这些…

vue项目开发vscode配置

配置代码片段 步骤如下&#xff1a; 文件->首选项->配置用户代码片段新增全局代码片段起全局代码片段文件名“xxx.code-snippets” 这里以配置vue2初始代码片段为例&#xff0c;配置具体代码片段 {"name": "vue-sph","version": "…

零基础学编程从哪里入手,在学习中可以线上会议答疑解惑

一、前言 零基础学编程可以先从容易学的语言入手&#xff0c;比如中文编程&#xff0c;然后再学其他编程语言则会比较轻松&#xff0c;初步掌握编程思路。很多IT人士一般学2到3种编程语言。 今天给大家分享的中文编程开发语言工具资料如下&#xff1a; 编程入门视频教程链接…