Apache Airflow (九) :Airflow Operators及案例之BashOperator及调度Shell命令及脚本

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. BashOperator 调度Shell命令案例

2. BashOperator 调度Shell脚本案例


Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator

BaseOperator中常用参数如下:

task_id(str) : 唯一task_id标记owner(str):任务的所有者,建议使用linux用户名email(str or list[str]):出问题时,发送报警Email的地址,可以填写多个,用逗号隔开。email_on_retry(bool):当任务重试时是否发送电子邮件email_on_failure(bool):当任务执行失败时是否发送电子邮件retries(int):在任务失败之前应该重试的次数retry_delay(datetime.timedelta):重试间隔,必须是timedelta对象start_date(datetime.datetime):DAG开始执行时间,这个参数必须是datetime对象,不可以使用字符串。end_date(datetime.datetime):DAG运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。depends_on_past(bool,默认False):是否依赖于过去,如果为True,那么必须之前的DAG调度成功了,现在的DAG调度才能执行。dag(airflow.models.DAG):指定的dag。execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。trigger_rule(str):定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。

BashOperator主要执行bash脚本或命令,BashOperator参数如下:

bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)

1. BashOperator 调度Shell命令案例

from datetime import datetime, timedeltafrom airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'email':'kettle_test1@163.com', #pwd:kettle123456'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_cmd',default_args=default_args,schedule_interval=timedelta(minutes=1)
)t1=BashOperator(task_id='print_date',bash_command='date',dag = dag
)t2=BashOperator(task_id='print_helloworld',bash_command='echo "hello world!"',dag=dag
)t3=BashOperator(task_id='tempplated',bash_command="""{% for i in range(5) %}echo "{{ ds }}"echo "{{ params.name}}"echo "{{ params.age}}"{% endfor %}""",params={'name':'wangwu','age':10},dag=dag
)t1 >> t2 >> t3

注意在t3中使用了Jinja模板,“{% %}”内部是for标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。

在default_args中的email是指当DAG执行失败时,发送邮件到指定邮箱,想要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5

此外,配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,设置如下:

2. BashOperator 调度Shell脚本案例

准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bashdt=$1echo "==== execute first shell ===="echo "---- first : time is ${dt}"

second_shell.sh

#!/bin/bashdt=$1echo "==== execute second shell ===="echo "---- second : time is ${dt}"

编写airflow python 配置:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_sh',default_args=default_args,schedule_interval=timedelta(minutes=1)
)first=BashOperator(task_id='first',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag = dag
)second=BashOperator(task_id='second',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag=dag
)first >> second

执行结果:

特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/186123.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2.c++基础语法

文章目录 1.c 程序结构关键字标识符、操作符、标点预处理指令注释main 主函数命名空间 2.c 变量和常量变量 3.c 数组和容器4.c 程序流程5.c字符和字符串 1.c 程序结构 关键字 关键字事程序保留的,程序员不能使用,c的常见关键字如下图: 标识…

拷贝文件到u盘提示文件过大

在u盘空间足够的情况下,如果提示这个,可以手动修改一下ntfs格式 点击 WinR 键, 输入cmd回车 使用convert f:/fs:ntfs命令,此处的f为盘符,根据u盘所在的盘符设置。 如果u盘有隐藏的文件在自动运行,可能导致…

Axure基础详解二十二:随机点名效果

效果演示 组件 建立一个【中继器】,内部插入一个“文本框”。【中继器】每页项目数为1,开始页为1。 设置交互 页面载入时交互 给【中继器】新曾行,“name”数据列添加10行数据,填入相应的名字;“shunxu”数据列全部…

Python基础入门----如何通过conda搭建Python开发环境

文章目录 使用 conda 搭建Python开发环境是非常方便的,它可以帮助你管理Python版本、依赖库、虚拟环境等。以下是一个简单的步骤,演示如何通过 conda 搭建Python开发环境: 安装conda: 如果你还没有安装 conda,首先需要安装Anaconda或Miniconda。Anaconda是一个包含很多数据…

Linux Control Cgroups

无论 Docker 如何进行隔离,无法否认的是我们在当前宿主机中运行的所有容器,它依赖的硬件资源都只是当前机器。 其实启动的每一个容器进程,它本身其实就是当前宿主机的进程之一,那么本质上来说,它也会和宿主机中的其他…

PatchMatchNet笔记

PatchMatchNet笔记 1 概述2 PatchmatchNet网络结构图2.1 多尺度特征提取2.2 基于学习的补丁匹配 3 性能评价 PatchmatchNet: Learned Multi-View Patchmatch Stereo:基于学习的多视角补丁匹配立体算法 1 概述 特点   高速,低内存,可以处理…

浙大恩特CRM文件上传0day

目录 简介 资产收集 漏洞复现 nuclei验证 简介 浙大恩特客户资源管理系统是一款针对企业客户资源管理的软件产品。该系统旨在帮助企业高效地管理和利用客户资源,提升销售和市场营销的效果。 资产收集 fofa:title”欢迎使用浙大恩特客户资源管理系统…

【Mysql系列】Mysql基础篇

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

【STM32外设系列】NRF24L01无线收发模块

🎀 文章作者:二土电子 🌸 关注公众号获取更多资料! 🐸 期待大家一起学习交流! 文章目录 一、NRF24L01简介1.1 什么是NRF24L011.2 NRF24L01引脚介绍1.3 NRF24L01工作模式1.4 NRF24L01的SPI时序1.5 Enhanc…

uniapp form表单提交事件手动调用

背景&#xff1a; UI把提交的按钮弄成了图片&#xff0c;之前的button不能用了。 <button form-type"submit">搜索</button> 实现&#xff1a; html&#xff1a; 通过 this.$refs.fd 获取到form的vue对象。手动调用里面的_onSubmit()方法。 methods:…

MIB 6.1810实验Xv6 and Unix utilities(3)pingpong

Mit6.S081-实验1-Xv6 and Unix utilities-pingpong问题_Isana_Yashiro的博客-CSDN博客 Write a user-level program that uses xv6 system calls to ping-pong a byte between two processes over a pair of pipes, one for each direction. The parent should send a byte to…

来文心中国行厦门站,感受大模型落地生花的进展!

11月22日&#xff0c;文心中国行将走进厦门。届时&#xff0c;政府、高校及企业的相关专家将现场分享AI和大模型最新进展&#xff0c;从人工智能政策解读&#xff0c;到大模型底层技术&#xff0c;再到产教融合下的空间感知与计算&#xff0c;产业创新应用洞察及实践案例等等&a…