[Airflow] 入门笔记

news/2025/1/9 4:22:42/文章来源:https://www.cnblogs.com/wangzhi8/p/18641743

前言

Airflow入门教程

正文

简介

任务管理、调度、监控工作流平台。

基于DAG(有向无环图)的任务管理系统。

基本架构

组件

  • scheduler
    以有向无环图(dag)的形式创建任务工作流,根据用户的配置将任务定时/定期进行调度
  • worker
    任务的执行单元,worker会从任务队列当中拉取任务并运行任务
  • webserver
    可视化管理界面,可以配置相关参数和操作dag
  • message queue
    任务的调度队列,一般使用redis或者rabbitMq作为broker
  • metadata center
    所有任务、dag、日志的相关元信息均存储于metadata Center,一般使用MariaDB(MySQL)进行metadata管理

调度单元

  • dag:
    airflow当中work flow的基本单位,通过配置Dag当中的相应参数确定调度时间、调度频率,通过实例化dag当中的task创建实际工作流
  • task:
    task由operator以及task upstream/downstream两部分组成,airflow提供了非常丰富的operator能够对接各类平台与系统实现任务执行,任务之间通过简单的上下游关系建立关系组成工作流

构建任务流

DAG

设置相关参数,定义一个任务流DAG

示例

from airflow import DAG
from datetime import datetime, timedelta
from tasks.alert_lark_message.webhook import task_failure_alert# 统一DAG的初始配置
default_args = {'owner': 'wz','depends_on_past': False,'email': ['wangzhi@abakusglobal.com'],'email_on_failure': False,'email_on_retry': False,'retries': 0,'retry_delay': timedelta(minutes=5),
}# 实例化DAG
with DAG('demo_dag',default_args=default_args,description='',schedule_interval='0 20 * * *',start_date=datetime(2024, 5, 20),catchup=False,tags=['demo'],on_failure_callback=task_failure_alert,
) as dag:# 定义任务# 定义执行规则pass

详细参数

  1. dag_id
  • 类型: str
  • 描述: DAG 的唯一标识符,用于在 Airflow UI 中标识 DAG。
  1. description
  • 类型: str
  • 描述: DAG 的描述,有助于其他用户理解 DAG 的用途。
  1. schedule_interval
  • 类型: datetime.timedelta | str | cron expression
  • 描述: 定义任务执行频率。可以是一个 cron 表达式、timedelta 对象或一个特殊的预设字符串(如 '@daily'、'@hourly')。
  1. start_date
  • 类型: datetime.datetime
  • 描述: DAG 的开始日期,不会在这个日期之前执行任何任务。
  1. end_date
  • 类型: datetime.datetime
  • 描述: DAG 的结束日期,该日期后不再调度任务。
  1. default_args
  • 类型: dict
  • 描述: 设置默认的任务参数,如重试次数、重试延迟、邮件通知等。
  1. tags
  • 类型: List[str]
  • 描述: 标签列表,用于在 UI 中更好地组织和查找 DAG。
  1. catchup
  • 类型: bool
  • 描述: 是否对过去的未执行周期执行补回任务。默认为 True。如果设置为 False,在 start_date 之后启动的 DAG 将不会执行早期周期的任务。
  1. max_active_runs
  • 类型: int
  • 描述: 同时运行的 DAG 实例的最大数量。
  1. dagrun_timeout
  • 类型: datetime.timedelta
  • 描述: 每个 DAG 运行的最大持续时间。超时后,DAG 运行会被标记为失败。
  1. on_failure_callback | on_success_callback | on_retry_callback
  • 类型: Callable
  • 描述: 当 DAG 失败、成功或重试时触发的回调函数。
  1. concurrency
  • 类型: int
  • 描述: 此 DAG 下任务实例可以并行运行的最大数量。
  1. doc_md
  • 类型: str
  • 描述: Markdown 格式的 DAG 文档,通常用于描述 DAG 的目的和操作细节。

TASK

具体要执行的任务内容

示例

  • 装饰器定义
@task()
def demo_task():# 读取配置config = Variable.get("demo_config", deserialize_json=True)print("hello, task!")
  • operation定义
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.python import ShortCircuitOperatordemo_task = PythonOperator(task_id='app_task',python_callable=task_method,dag=dag
)demo_bash_task = BashOperator(task_id='app_bash',bash_command=bash_command,dag=dag
)# 短路操作符
# 如果该函数返回 True,则会阻止其所有下游任务的执行。
# 如果返回 False,则工作流会继续执行其下游任务
demo_short_circuit_task = ShortCircuitOperator(task_id='short_circuit_task',python_callable=task_method,
)
  • TaskGroup
with TaskGroup("group_a") as group_a:@task()def gen_group_a_task():print("task_a")with TaskGroup("group_b") as group_b:@task()def gen_group_b_task():print("task_b")group_a >> group_b
  • Task上下文参数

在方法中使用**kwargs接收上下文参数

@task()
def demo_task(**kwargs):start_time = str(kwargs['data_interval_start'])end_time = str(kwargs['data_interval_end'])print(start_time, end_time)

定义规则

  • 定义task执行顺序
# 定义任务规则
demo_step1_task() >> demo_step2_task() >> end_task()start_task() >> [demo_step1_task(), demo_step2_task()] >> end_task()
  • 对task分组
# 分组
with TaskGroup("group_a") as group_a:@task()def group_a_demo_task():print("this is demo a")
# 分组
with TaskGroup("group_b") as group_b:@task()def group_b_demo_task():print("this is demo b")group_a >> end_task
group_b >> end_task

参考:

  • airflow官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/861458.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024下学期加分项

软考中级设计师通过资格证书

直接调用文件设置qt可执行程序的图标,运行时的图标,exe本身的图标,以及固定到任务栏时的图标,窗口坐上角的图标

// 设置应用程序图标(窗口图标和任务栏图标)this->setWindowIcon(QIcon("./Icon/ReadADtool.ico")); // 从资源文件中加载图标 固定到任务栏上时的图标: 在pro文件添加如下指令:设置rc文件内容:IDI_ICON1 ICON DISCARDABLE "ReadADtool.ico…

Ajax入门以及Axios的详细使用(含Promise)

1. 概述 1.1 是什么Ajax = Asynchronous JavaScript and XML(异步的 JavaScript 和 XML)Ajax 不是新的编程语言,而是一种用于创建快速动态网页的技术Ajax 最大的优点是在不重新加载整个页面的情况下,可以与服务器交换数据并更新部分网页内容,使网页实现异步更新传统的网页…

超低功耗段LCD液晶段码显示屏驱动芯片(ic)VKL128 LQFP44 I2C通信接口/可配置4种功耗模式

产品品牌:永嘉微电/VINKA 产品型号:VKL128 封装形式:SSOP44 概述 VKL128是一个点阵式存储映射的LCD驱动器,可支持最大128点(32SEGx4COM)的LCD屏。单片机可通过I2C接口配置显示参数和读写显示数据,可配置4种功耗模式,也可通过关显示和关振荡器进入省电模式。其高抗干扰,…

私有化部署视频平台EasyCVR安防小知识:如何评估一个监控系统的抗干扰性能?

在当今复杂多变的监控环境中,确保监控系统的稳定性和可靠性至关重要。抗干扰性能作为衡量监控系统性能的关键指标之一,直接关系到监控图像的清晰度、数据的完整性以及系统的响应速度。 本文将详细介绍评估监控系统抗干扰性能的多个关键方面,以及如何通过这些评估来优化系统性…

离线安装Kubesphere

1.环境要求 【centos7.X】 1.1依赖项要求 master、node1节点安装 yum install -y socat conntrack ebtables ipset1.2获取镜像列表访问 https://get-images.kubesphere.io/ 选择需要部署的扩展组件。 填入邮箱地址。 点击获取镜像列表。 查看填写的邮箱,获取 KubeSphere 最新的…

@antv/g2 使用小结

版本:5.2.10 一、引入@antv/g2 import { Chart } from "@antv/g2";二、初始化 # container里类似echarts,可以用id也可以用refs let chart = new Chart({ container: "pie-chart", autoFit: true }); # 如果chart已存在,就是重复渲染的情况 if (chart) …

加分博客

恋爱对象陆思梦,高中同学,暗黑女神(爱称,因为我是迪迦)。 恋爱过程高中应该怎么说呢,我跟她是高中同学,那个时候还没有喜欢她,而是班上的另一个女孩,杨sir,跟杨sir表白被拒后,就没然后了。高中的时候,只觉得她是一个开朗活泼的女孩,她还有个妹妹,是双胞胎,两个…

python 资源管理工具V1

python 资源管理工具V1 资源管理工具V1界面python 3 环境安装 python -m pip install configparser==5.3.0 -i https://pypi.tuna.tsinghua.edu.cn/simple/ python -m pip install pymysql==0.9.3 -i https://pypi.tuna.tsinghua.edu.cn/simple/ python -m pip install pyper…

基于时间轴的项目进度管理工具

在项目管理中,时间轴是一种强大的工具,它能够直观地展示项目的进度、任务的先后顺序以及时间的推移。通过时间轴,项目团队可以清晰地把握项目的整体节奏,及时发现潜在问题,做出合理的决策。市面上基于时间轴的项目进度管理工具众多,每一款都有其独特的特点和优势。本文将…