工作流管理框架airflow-安装部署教程

1 概述

Airflow是一个以编程方式编写,用于管理和调度工作流的平台。可以帮助你定义复杂的工作流程,然后在集群上执行和监控这些工作流。

Airflow计划程序在遵循指定的依赖项,同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变的轻而易举。Airflow的可扩展Python框架可以让你构建连接几乎任何技术的工作流程。丰富的用户界面可以随时查看生产中正在运行的管道,帮助你管理工作流程的状态,监视进度以及需要时对问题进行故障排除。

Airflow的主要组件有:

DAG(有向无环图):使用Airflow将工作流编写任务的有向无环图(DAG)。一个DAG定义了一个工作流,它包含所有任务、任务的依赖关系和时间表。

任务(Task):一个任务定义了一个单独的单元工作,有一个确定的开始和结束。一个任务可以依赖于其他任务。

运算符(Operator):一个运算符封装了一个任务,并定义了它的执行逻辑。Airflow内置了许多运算符,如BashOperator、PythonOperator、EmailOperator等。你也可以自定义运算符。

时间轴(Timeline):时间轴让你以图形方式查看 DAG 的运行情况和状态。

调度器(Scheduler):调度器监视时间轴并触发需要运行的任务。

执行器(Executor):executor负责实际运行任务。Airflow支持多种executor,如LocalExecutor, CeleryExecutor, KubernetesExecutor 等。

2 名词

(1)Dynamic:Airflow管道是用Python代码配置的,允许动态生成管道。Airflow配置需要使用Python,这允许编写可动态实例化管道的代码。

(2)Extensible:Airflow框架包含许多运算符来连接各种技术。Airflow的所有组件都是可扩展的。轻松定义自己的运算符,执行程序并扩展库,使其适合于您的环境。

(3)Elegant:Airlfow是精简灵活的,使用功能强大的Jinja模板引擎,将脚本参数化内置于Airflow的核心中。

(4)Scalable:Airflow具有模板块架构,并使用消息队列来安排任意数量的工作任务。

3 airflow优缺点

优点:

Python脚本实现DAG,非常容易扩展;

可实现复杂的依赖规则;

外部依赖较少,搭建容易,仅依赖DB和rabbitmq;

工作流依赖可视化。有一套完整的UI,可视化展现所有任务的状态及历史信息;(本人刚开始主要看重这点)

完全支持crontab定时任务格式,可以通过crontab格式指定任务何时进行;

业务代码和调度系统解耦,每个业务的流程代码以独立的Python脚本描述,里面定义了流程化的节点来执行业务逻辑,支持任务的热加载.

缺点:

Airflow是为有限的批处理工作流构建的。虽然CLI和REST API确实允许触发工作流,但Airflow不是为无限运行的基于事件的工作流构建的。Airflow不是流解决方案。然而,像Apache Kafka这样的流系统通常与Apache Airflow一起使用。Kafka可以用于实时接收和处理事件数据,事件数据被写入存储位置,Airflow定期启动处理一批数据的工作流。

如果你更喜欢点击而不是编码,Airflow可能不是正确的解决方案。Web界面旨在最大限度地简化工作流的管理,Airflow框架不断改进以最大限度地简化开发人员体验。然而,Airflow的理念是将工作流定义为代码,所以代码始终是必需的。

4 Airflow安装

airflow官网地址:https://airflow.apache.org。

1)先安装并配置好python环境(可以参考Anaconda安装即可,如果项目不需要依赖太多工具包,可选择更简洁的MiniConda)并激活。

2)安装airflow

pip install apache-airflow

3)初始化airflow

airflow db init

4)查看版本

airflow version

5)启动airflow web服务,启动后浏览器访问http://ip_address:12025(如果不知道ip地址的就用ifconfig命令去linux下获取)

airflow webserver -p 12025 -D

6)启动airflow调度

airflow scheduler -D

7)创建账号(斜杠别忘记了)

airflow users create \

  --username admin \

  --firstname trisyp \

  --lastname trisyp \

  --role Admin \

  --email trisyp@email.com

回车之后会让你输入两次password,我们就用123456

8)启动停止脚本

vim af.sh

#!/bin/bash

case $1 in

"start"){

    echo " --------启动 airflow-------"

    ssh ip_address "conda activate airflow;airflow webserver -p 12025 -D;airflow scheduler -D; conda deactivate"

};;

"stop"){

    echo " --------关闭 airflow-------"

    ps -ef|egrep 'scheduler|airflow-webserver'|grep -v grep|awk '{print $2}'|xargs kill -15

};;

esac

添加权限即可使用。

trisyp@ip_address bin]$ chmod +x af.sh

5 修改数据库为MySQL

1)先在MySQL中建库

mysql> CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

2)如果报错Linux error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol,可以关闭MySQLSSL证书

查看SSL是否开启  YES为开启

mysql> SHOW VARIABLES LIKE '%ssl%';

+---------------+-----------------+

| Variable_name | Value           |

+---------------+-----------------+

| have_openssl  | YES             |

| have_ssl      | YES             |

| ssl_ca        | ca.pem          |

| ssl_capath    |                 |

| ssl_cert      | server-cert.pem |

| ssl_cipher    |                 |

| ssl_crl       |                 |

| ssl_crlpath   |                 |

| ssl_key       | server-key.pem  |

+---------------+-----------------+

3)修改配置文件my.cnf(注意:直接数据库修改值不起作用),加入以下内容:

# disable_ssl

skip_ssl

4)添加python连接的依赖,官网介绍的方法有两种:

这里我们选择mysql+mysqlconnector。

pip install mysql-connector-python

5)修改airflow的配置文件(vim ~/airflow/airflow.cfg):

[database]

# The SqlAlchemy connection string to the metadata database.

# SqlAlchemy supports many different database engines.

# More information here:

# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri

#sql_alchemy_conn = sqlite:home/trisyp/airflow/airflow.db

sql_alchemy_conn = mysql+mysqlconnector://root:123456@ip_address:3306/airflow_db

6)关闭airflow,初始化后重启:

af.sh stop

airflow db init

af.sh start

7)若初始化报错1067 - Invalid default value for ‘update_at’:

原因:字段 'update_at' timestamp类型,取值范围是:1970-01-01 00:00:00 2037-12-31 23:59:59UTC +8 北京时间从1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。

推荐修改mysql存储时间戳格式:

mysql> set GLOBAL sql_mode ='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'

重启MySQL会造成参数失效(注意:这样就需要重新创建账号),推荐将参数写入到配置文件my.cnf中。

sql_mode = STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

6 修改执行器

官网不推荐在开发中使用顺序执行器,会造成任务调度阻塞。

1)修改airflow的配置文件(vim ~/airflow/airflow.cfg)

[core]

# The executor class that airflow should use. Choices include

# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,

# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the

# full import path to the class when using a custom executor.

executor = LocalExecutor

可以使用官方推荐的几种执行器,也可以自定义。这里我们选择本地执行器即可。

7 部署使用

1)测试环境启动

本次测试使用的是spark的官方案例,所有需要启动hadoop和spark的历史服务器。

myhadoop.sh start

cd /opt/module/spark-yarn/sbin/start-history-server.sh

2)查看Airflow配置文件

vim ~/airflow/airflow.cfg

3)编写.py脚本,创建work-py目录用于存放python调度脚本

mkdir ~/airflow/dags

cd dags/

然后把脚本文件放到dags文件夹,代码如下:

from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

default_args = {  # 设置默认参数。

    # 用户

    'owner': 'test_owner',

    # 是否开启任务依赖

    'depends_on_past': True,

    # 邮箱

    'email': ['trisyp@email.com'],

    # 启动时间

    'start_date':datetime(2022,11,28),

    # 出错是否发邮件报警

    'email_on_failure': False,

    # 重试是否发邮件报警

    'email_on_retry': False,

    # 重试次数

    'retries': 3,

    # 重试时间间隔

    'retry_delay': timedelta(minutes=5),

}

# 声明任务图,schedule_interval:调度频率。

dag = DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))

 

# 创建单个任务

t1 = BashOperator(  # BashOperator:具体执行任务,如果为true前置任务必须成功完成才会走下一个依赖任务,如果为false则忽略是否成功完成。

    # 任务id:任务唯一标识(必填)。

    task_id='dwd',

    # 具体任务执行命令。

    bash_command='ssh ip_address "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',

    # 重试次数

    retries=3,

    # 把任务添加进图中

    dag=dag)

t2 = BashOperator(

    task_id='dws',

    bash_command='ssh ip_address "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',

    retries=3,

    dag=dag)

t3 = BashOperator(

    task_id='ads',

    bash_command='ssh ip_address "/opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 "',

    retries=3,

    dag=dag)

# 设置任务依赖:ads任务依赖dws任务依赖dwd任务。

t2.set_upstream(t1)

t3.set_upstream(t2)

4)等待一段时间,刷新任务列表

airflow dags list

5)已出现myairflow_execute_bash任务(刷新页面)

6)点击运行

7)查看dag图、甘特图,点击成功任务,查看日志

8)查看脚本代码

9)Dag任务操作

9.1 删除Dag任务

主要删除DAG任务不会删除底层文件,过一会还会自动加载回来。

9.2 查看当前所有dag任务

# 查看所有任务

airflow dags list

# 查看单个任务

airflow tasks list test --tree

8 配置邮件服务器

1)保证邮箱已开SMTP服务

2)修改airflow配置文件,用stmps服务对应587端口

vim ~/airflow/airflow.cfg 

smtp_host = smtp.qq.com

smtp_starttls = True

smtp_ssl = False

smtp_user = trisyp@email.com

# smtp_user =

smtp_password = qluxdbuhgrhgbigi

# smtp_password =

smtp_port = 587

smtp_mail_from = trisyp@email.com

3)重启airflow

af.sh stop

af.sh start

4)编辑test.py脚本,加入emailOperator

from airflow.operators.email_operator import EmailOperator

email=EmailOperator(

    task_id="email",

    to="yaohm163@163.com ",

    subject="test-subject",

    html_content="<h1>test-content</h1>",

    cc="trisyp@email.com ",

    dag=dag)

t2.set_upstream(t1)

t3.set_upstream(t2)

email.set_upstream(t3)

5)查看页面是否生效

6)运行测试

9 避坑指南

1)Exception rendering Jinja template for task

2)Intel MKL FATAL ERROR: Cannot load ../numexpr/../../../libmkl_rt.so.1.

强制更新airflow到最新版

3)error: subprocess-exited-with-error

解决方案:

错误有明确的提示,缺少pkg-config,所以就先安装这个包,然后在安装mysqlclient。

sudo apt-get install pkg-config

4)Can't connect to local MySQL server through socket '/tmp/mysql.sock' (2)

解决方案:

先用命令“find / -name ‘mysql.sock”来查看下这个文件所在目录,如果有就建立软连接(不要想着拷贝复制,无效的),命令是“ln -s /tmp/mysql.sock”。如果没有就找my.cnf文件,一般文件地址为/etc/mysql/my.cnf,然后通过vim加上socket路径信息,一定要加mysqld这个分组,不然会报找不到分组这个错;Found option without preceding group

5)Segmentation fault (core dumped)

解决方案:

在配置mysql存储的时候要加上mysqlconnector就解决了。这个坑非常恶心,你参照某些教程直接只配mysql,忽视了connector,碰到了还找不到解决方案,因为核心存储转移你不知道怎么搞。

cd /etc

vim profile

加入:

export AIRFLOW_HOME=/root/airflow

sudo mysql

create database airflow_db;

create user 'airflow'@'%' identified by '123456';

grant all on airflow_db .* to 'airflow'@'%';

sql_alchemy_conn = mysql://airflow:123456@10.0.0.22:3306/airflow_db

10 参考链接

https://yuchaoshui.com/1bd10cc/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/413081.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LabVIEW精确测量产品中按键力和行程

项目背景 传统的按键测试方法涉及手工操作&#xff0c;导致不一致和效率低下。在汽车行业中&#xff0c;带有实体按键的控制面板非常常见&#xff0c;确保一致的按键质量至关重要。制造商经常在这些组件的大规模、准确测试中遇到困难。显然&#xff0c;需要一个更自动化、精确…

2.【Linux】(进程的状态||深入理解fork||底层剖析||task_struct||进程优先级||并行和并发||详解环境变量)

一.进程 1.进程调度 Linux把所有进程通过双向链表的方式连接起来组成任务队列&#xff0c;操作系统和cpu通过选择一个task_struct执行其代码来调度进程。 2.进程的状态 1.运行态&#xff1a;pcb结构体在运行或在运行队列中排队。 2.阻塞态&#xff1a;等待非cpu资源就绪&am…

算法练习-A+B/财务管理/实现四舍五入/牛牛的菱形字符(题目链接+题解打卡)

难度参考 难度&#xff1a;简单 分类&#xff1a;熟悉OJ与IDE的操作 难度与分类由我所参与的培训课程提供&#xff0c;但需要注意的是&#xff0c;难度与分类仅供参考。以下内容均为个人笔记&#xff0c;旨在督促自己认真学习。 题目 A B1. A B - AcWing题库财务管理1004:财…

【C语言基础考研向】05 scanf读取标准输入超详解

文章目录 一.scanf函数的原理 样例问题原因解决方法 二.多种数据类型混合输入 错误样例正确样例 一.scanf函数的原理 C语言未提供输入/输出关键字&#xff0c;其输入和输出是通过标准函数库来实现的。C语言通过scanf函数读取键盘输入&#xff0c;键盘输入又被称为标准输入。…

FairyGUI Day 1 导入FairyGUI

FairyGUI Unity3d引擎版本&#xff1a;Uinty3d 20233.2.3f1 1、从资产商店中将FairyGUI购入我的资产中&#xff0c;目前是免费的。 2、从我的资产中将FairyGUI导入到当前项目中。 3、我遇到的问题&#xff0c;我的Assets下有两个文件夹分别是Resources和Scenes&#xff0c;导…

postman案例

一、表单接口 基本正向 有效反向 无效反向 JSON接口 基本正向 有效反向 无效反向 文件上传接口 token 获取token值 一&#xff1a; 二&#xff1a; Bearer 获取的token的值&#xff0c;至于鉴权方式要根据swagger接口文档要求

DNS分离解析

一、介绍 分离解析的域名服务器实际也是主域名服务器&#xff0c;这里主要是指根据不同的客户端提供不同的域名解析记录。比如来自内网和外网的不同网段地址区域的客户机请求解析同一域名时&#xff0c;为其提供不同的解析结果&#xff0c;得到不同的IP地址。 DNS的分离…

冻结Prompt微调LM: T5 PET (a)

T5 paper: 2019.10 Exploring the Limits of Transfer Learning with a Unified Text-to-Text Transformer Task: Everything Prompt: 前缀式人工prompt Model: Encoder-Decoder Take Away: 加入前缀Prompt&#xff0c;所有NLP任务都可以转化为文本生成任务 T5论文的初衷如…

tcpdump常用命令

tcp首部解析&#xff1a; tcp-首部_tcp首部-CSDN博客 ref&#xff1a; Home | TCPDUMP & LIBPCAP https://www.cnblogs.com/onlyforcloud/p/4396126.html tcpdump 详细使用指南&#xff08;请尽情食用&#xff09;_tcpdump指定ip和端口-CSDN博客 【博客192】抓取报文查…

输入框输入关键字 下拉框的关键字高亮

直接上代码 //搜索框部分 <div><input v-modelkeyWord /><button clickseachFn>搜索</button> </div> //下拉框部分 <div><div v-html"item.name" v-foritem in droplist :keyitem.id></div> </div> <sc…

SpringBoot 入门教程

1.复习SSM项目中&#xff0c;用spring&#xff0c;mybatis,springmvc这三个框架整合的项目。 SSM项目的所有类&#xff0c;这是用SSM整合一个搜索书籍种类和呈现的前端和后端的ssm的小项目。 2.springboot如何去开发这个页面&#xff1a; 新建springboot项目&#xff0c;勾选对…

FTP文件传输与vsftpd配置

一 存储类型 直连式存储DAS 适用于那些数据量不大&#xff0c;对磁盘访问速度要求较高的中小企业 存储区域网络SAN 用来存储非结构化数据&#xff0c;虽然受限于以太网的速度&#xff0c;但是部署灵活&#xff0c;成本低 网络附加存储NAS 适用于大型应用或数据库系统&…