巧用输出变量,提升Dolphinscheduler工作流灵活性和可维护性

输出变量是 DolphinScheduler 任务调度中实现数据流动与任务协作的核心机制,通过显式定义和传递参数,解决了跨节点数据共享、优先级冲突等问题,同时支持复杂流程编排(如子流程、条件分支)。合理使用输出变量能显著提升工作流的灵活性和可维护性。本文将介绍 DolphinScheduler 中重要的输出变量及其使用方法,

1、Shell 脚本中,单引号 (')、双引号 (") 和反引号 (`)使用

在 Shell 脚本中,单引号 (')、双引号 (") 和反引号 (`) 各自有不同的作用和用法。理解它们的区别和用法对于编写和调试 Shell 脚本非常重要

1.1、单引号 (')

  • 用途:完全引用,用于保护字符串中的所有字符,不进行变量替换或命令替换
  • 特性:引号内的任何字符都将原样输出,不会进行任何解释
VAR="world"
echo 'Hello, $VAR'  # 输出:Hello, $VAR

1.2、双引号 (")

  • 用途:部分引用,用于保护字符串中的大部分字符,但允许变量替换和命令替换
  • 特性:引号内的变量和命令会被解释,其余字符原样输出
VAR="world"
echo "Hello, $VAR"  # 输出:Hello, world

1.3、反引号 (`)

  • 用途:命令替换,用于执行引号内的命令,并将命令的输出作为结果返回
  • 特性:引号内的命令会被执行,结果会替换反引号中的内容
  • 注意:反引号是旧的命令替换方式,更推荐使用 $()
DATE=`date`
echo "Current date and time: $DATE"  # 输出当前日期和时间

1.4、推荐使用 $() 进行命令替换

  • 更现代和可读性更好
  • 可以嵌套使用,而反引号嵌套使用比较困难
DATE=$(date)
echo "Current date and time: $DATE"  # 输出当前日期和时间# 嵌套命令替换示例
OUTER=$(echo "Outer $(echo "Inner")")
echo $OUTER  # 输出:Outer Inner

1.5、使用场景

  • 单引号:当你不希望字符串中的任何内容被解释时使用,例如正则表达式、特殊字符等
  • 双引号:当你希望字符串中包含变量或命令替换时使用
  • 反引号/$():当你需要执行命令并使用其输出时使用

2、Dolphinscheduler中的输出变量和文件传递

2.1、Shell输出变量

  • 2.1.1、流程如下

  • 2.1.2、任务配置
    taskA

echo 'taskA'
echo "#{setValue(linesNum=${lines_num})}"
echo '${setValue(words=20)}'

注意 : 这里${lines_num}其实直接就是通过Worker进行变量进行替换的

taskB

echo 'taskB'
echo ${linesNum}
echo ${words}
  • 2.1.3、结果输出
    主要看taskB的输出
}
[INFO] 2024-07-05 10:09:54.539 +0800 - Success initialized task plugin instance successfully
[INFO] 2024-07-05 10:09:54.539 +0800 - Set taskVarPool: [{"prop":"linesNum","direct":"IN","type":"VARCHAR","value":"100"},{"prop":"words","direct":"IN","type":"VARCHAR","value":"20"}] successfully
[INFO] 2024-07-05 10:09:54.539 +0800 - ***********************************************************************************************
[INFO] 2024-07-05 10:09:54.539 +0800 - *********************************  Execute task instance  *************************************
[INFO] 2024-07-05 10:09:54.539 +0800 - ***********************************************************************************************
[INFO] 2024-07-05 10:09:54.540 +0800 - Final Shell file is: 
[INFO] 2024-07-05 10:09:54.540 +0800 - ****************************** Script Content *****************************************************************
[INFO] 2024-07-05 10:09:54.540 +0800 - #!/bin/bash
BASEDIR=$(cd `dirname $0`; pwd)
cd $BASEDIR
source /etc/profile
export HADOOP_HOME=${HADOOP_HOME:-/home/hadoop-3.3.1}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME=${SPARK_HOME:-/home/spark-3.2.1-bin-hadoop3.2}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/home/hive-3.1.2}
export FLINK_HOME=/home/flink-1.18.1
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export SEATUNNEL_HOME=/opt/software/seatunnel
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH
echo 'taskB'
echo 100
echo 20
[INFO] 2024-07-05 10:09:54.540 +0800 - ****************************** Script Content *****************************************************************
[INFO] 2024-07-05 10:09:54.540 +0800 - Executing shell command : sudo -u root -i /tmp/dolphinscheduler/exec/process/root/13850571680800/14171397340576_3/1961/1454/1961_1454.sh
[INFO] 2024-07-05 10:09:54.544 +0800 - process start, process id is: 588336
[INFO] 2024-07-05 10:09:56.544 +0800 -  -> taskB10020
[INFO] 2024-07-05 10:09:56.546 +0800 - process has exited. execute path:/tmp/dolphinscheduler/exec/process/root/13850571680800/14171397340576_3/1961/1454, processId:588336 ,exitStatusCode:0 ,processWaitForStatus:true ,processExitValue:0

2.2、Shell文件传递

  • 2.2.1、流程如下

  • 2.2.2、任务配置
    fileUploadTask

echo 'fileUploadTask'mkdir -p data/test1 data/test2
echo "test1 message" >> data/test1/text.txt
echo "test2 message" >> data/test2/text.txttree.

fileDownloadTask

echo 'fileDownloadTask'cat input_dir/test1/text.txt
cat input_dir/test2/text.txt
  • 2.2.3、结果输出
    fileDownloadTask
[INFO] 2024-07-05 11:11:08.160 +0800 - Success initialized task plugin instance successfully
[INFO] 2024-07-05 11:11:08.160 +0800 - Set taskVarPool: [{"prop":"fileUploadTask.file-text","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240705/14171986797856/2_1962/fileUploadTask_1455_text.txt"},{"prop":"fileUploadTask.dir-data","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240705/14171986797856/2_1962/fileUploadTask_1455_data_ds_pack.zip"}] successfully
[INFO] 2024-07-05 11:11:08.160 +0800 - ***********************************************************************************************
[INFO] 2024-07-05 11:11:08.160 +0800 - *********************************  Execute task instance  *************************************
[INFO] 2024-07-05 11:11:08.160 +0800 - ***********************************************************************************************
[INFO] 2024-07-05 11:11:08.160 +0800 - Final Shell file is: 
[INFO] 2024-07-05 11:11:08.160 +0800 - ****************************** Script Content *****************************************************************
[INFO] 2024-07-05 11:11:08.160 +0800 - #!/bin/bash
BASEDIR=$(cd `dirname $0`; pwd)
cd $BASEDIR
source /etc/profile
export HADOOP_HOME=${HADOOP_HOME:-/home/hadoop-3.3.1}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME=${SPARK_HOME:-/home/spark-3.2.1-bin-hadoop3.2}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/home/hive-3.1.2}
export FLINK_HOME=/home/flink-1.18.1
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export SEATUNNEL_HOME=/opt/software/seatunnel
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH
echo 'fileDownloadTask'cat input_dir/test1/text.txt
cat input_dir/test2/text.txt
[INFO] 2024-07-05 11:11:08.161 +0800 - ****************************** Script Content *****************************************************************
[INFO] 2024-07-05 11:11:08.161 +0800 - Executing shell command : sudo -u root -i /tmp/dolphinscheduler/exec/process/root/13850571680800/14171986797856_2/1962/1456/1962_1456.sh
[INFO] 2024-07-05 11:11:08.164 +0800 - process start, process id is: 590323
[INFO] 2024-07-05 11:11:10.164 +0800 -  -> fileDownloadTasktest1 messagetest2 message
[INFO] 2024-07-05 11:11:10.166 +0800 - process has exited. execute path:/tmp/dolphinscheduler/exec/process/root/13850571680800/14171986797856_2/1962/1456, processId:590323 ,exitStatusCode:0 ,processWaitForStatus:true ,processExitValue:0

2.3、SQL任务输出变量

模式从SqlTask任务进行结果的输出,ShellTask对结果使用

  • 2.3.1、流程如下

  • 2.3.2、任务配置
    sqlOutVarTask

select user_name as userNameList from t_ds_user

readOutVarTask

echo 'readOutVarTask'
echo ${userNameList}
  • 2.3.3、结果输出
    readOutVarTask
[INFO] 2024-07-05 11:19:00.294 +0800 - Success initialized task plugin instance successfully
[INFO] 2024-07-05 11:19:00.294 +0800 - Set taskVarPool: [{"prop":"userNameList","direct":"IN","type":"LIST","value":"[\"admin\",\"qiaozhanwei\",\"test\"]"}] successfully
[INFO] 2024-07-05 11:19:00.294 +0800 - ***********************************************************************************************
[INFO] 2024-07-05 11:19:00.294 +0800 - *********************************  Execute task instance  *************************************
[INFO] 2024-07-05 11:19:00.294 +0800 - ***********************************************************************************************
[INFO] 2024-07-05 11:19:00.295 +0800 - Final Shell file is: 
[INFO] 2024-07-05 11:19:00.295 +0800 - ****************************** Script Content *****************************************************************
[INFO] 2024-07-05 11:19:00.295 +0800 - #!/bin/bash
BASEDIR=$(cd `dirname $0`; pwd)
cd $BASEDIR
source /etc/profile
export HADOOP_HOME=${HADOOP_HOME:-/home/hadoop-3.3.1}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME=${SPARK_HOME:-/home/spark-3.2.1-bin-hadoop3.2}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/home/hive-3.1.2}
export FLINK_HOME=/home/flink-1.18.1
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export SEATUNNEL_HOME=/opt/software/seatunnel
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH
echo 'readOutVarTask'
echo ["admin","qiaozhanwei","test"]
[INFO] 2024-07-05 11:19:00.295 +0800 - ****************************** Script Content *****************************************************************
[INFO] 2024-07-05 11:19:00.295 +0800 - Executing shell command : sudo -u root -i /tmp/dolphinscheduler/exec/process/root/13850571680800/14172048617888_1/1963/1458/1963_1458.sh
[INFO] 2024-07-05 11:19:00.299 +0800 - process start, process id is: 590781
[INFO] 2024-07-05 11:19:02.299 +0800 -  -> readOutVarTask[admin,qiaozhanwei,test]
[INFO] 2024-07-05 11:19:02.301 +0800 - process has exited. execute path:/tmp/dolphinscheduler/exec/process/root/13850571680800/14172048617888_1/1963/1458, processId:590781 ,exitStatusCode:0 ,processWaitForStatus:true ,processExitValue:0

以上就是今天的全部内容了,希望对大家更好地使用DolphinScheduler有所帮助。

转载自Journey
原文链接:https://segmentfault.com/a/1190000045035406

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/901031.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3.18 关系

1.1 笛卡尔积 序偶/元组 是有顺序的1.2 笛卡尔积与关系关系就两种:集合A上的二元关系/集合A到集合B的二元关系关系是笛卡尔积的子集 1.3 特殊关系

Spring AI Alibaba 应用框架挑战赛圆满落幕,恭喜获奖选手

Spring AI Alibaba 是一款 Java 语言实现的 AI 应用开发框架,用于加速和简化 Java 开发者的 AI 应用开发,定义 Spring 框架下的 AI 应用开发模式。本项目基于 Pivotal 公司开源的 Spring AI 开源项目构建,突出 Spring AI 与阿里云开源/商业生态的集成与最佳实践,集成范围涉…

干锅菜单

100001、 100002、 100003、 100004、 100005、 100006、 100007、 100008、本人前端水平有限,写的知识点可能有谬误,欢迎留言指正,如果看到,我将第一时间回复。感谢支持!

汤锅菜单

110001、 110002、 110003、 110004、本人前端水平有限,写的知识点可能有谬误,欢迎留言指正,如果看到,我将第一时间回复。感谢支持!

web153笔记(后端不能单⼀校验,后端校验要严密+过滤php+.user.ini文件包含)

这⼀次再传php就拦截下来了,这⾥开始可以⽤ .user.ini 来构造后⻔php.ini是php的⼀个全局配置⽂件,对整个web服务起作⽤;⽽.user.ini和.htaccess⼀样是⽬录的配置⽂件,.user.ini就是⽤户⾃定义的⼀个php.ini,我们可以利⽤这个⽂件来构造后⻔和隐藏后⻔。.htaccess是Apache…

VISIO-visio2013激活工具亲测有效

如果觉得对您有帮助 还请点赞收藏,谢谢~ 通过网盘分享的文件:激活工具 链接: https://pan.baidu.com/s/1y4FDjp59CoXtcB70_q1slA?pwd=sky1 提取码: sky1

安装ubantu报错VMware Workstation 与 Device/Credential Guard 不兼容

报错问题解决 1、按下WIN+R打开运行,然后输入services.msc回车; 2、在服务中找到 HV主机服务,双击打开设置为禁用3、win+x,然后打开powershell(管理员)运行命令:bcdedit /set hypervisorlaunchtype off 4、重启电脑

如何在 Github 上获得 1000 star?

作为程序员,Github 是第一个绕不开的网站。我们每天都在上面享受着开源带来的便利,我相信很多同学也想自己做一个开源项目,从而获得大家的关注。然而,理想很丰满,现实却是开发了很久的项目仍然无人问津。 最近,我的一个项目获得了超过 1000 star 的成绩,我认为这是一个重…

生活-家电: 论“内卷文化” + 品牌 VS 品质:记一起“知名品牌” 因 “商品设计缺陷”的退货经历

论“内卷文化” 如有不对欢迎指正。 起初,我也不知道"什么是内卷",面对现象级的"内卷文化"兴起, 各地"卷王不断", 实则是"阶层固化", "富贵阶级 压榨 普通民众的"另一"代称"? "富商贵族"多了第一批…

菜鸡的LLM algorithm学习笔记(I)-transformer篇

Transformer 结构分为几个部分embedding,encoder,decoder以及output 1.embedding block作为对于模型输入的处理,首先将模型的输入进行向量化;如输入为“我想要吃一个苹果。”-->X: [[0,0,0,1,2,222,....], [2,2,3,4,5,...], [3,4,5,6,....], []....];接着经过positional…