Apache Airflow (十四) :Airflow分布式集群搭建及测试

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. 节点规划

2. airflow集群搭建步骤

3. 初始化Airflow

4. 创建管理员用户信息

​​​​​​​5. 配置Scheduler HA

​​​​​​​6. 启动Airflow集群

​​​​​​​7. 访问Airflow 集群WebUI

8. 测试Airflow HA


1. 节点规划

节点IP

节点名称

节点角色

运行服务

192.168.179.4

node1

Master1

webserver,scheduler

192.168.179.5

node2

Master2

websever,scheduler

192.168.179.6

node3

Worker1

worker

192.168.179.7

node4

Worker2

worker

2. airflow集群搭建步骤

1) 在所有节点安装python3.7

参照单节点安装Airflow中安装anconda及python3.7。

2) 在所有节点上安装airflow

  • 每台节点安装airflow需要的系统依赖
yum -y install mysql-devel gcc gcc-devel python-devel gcc-c++ cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib 
  • 每台节点配置airflow环境变量
vim /etc/profileexport AIRFLOW_HOME=/root/airflow#使配置的环境变量生效source /etc/profile
  • 每台节点切换airflow环境,安装airflow,指定版本为2.1.3
(python37)   conda activate python37(python37) pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

默认Airflow安装在$ANCONDA_HOME/envs/python37/lib/python3.7/site-packages/airflow目录下。配置了AIRFLOW_HOME,Airflow安装后文件存储目录在AIRFLOW_HOME目录下。可以每台节点查看安装Airflow版本信息:

(python37)  airflow version2.1.3
  • 在Mysql中创建对应的库并设置参数

aiflow使用的Metadata database我们这里使用mysql,在node2节点的mysql中创建airflow使用的库及表信息。

CREATE DATABASE airflow CHARACTER SET utf8;create user 'airflow'@'%' identified by '123456';grant all privileges on airflow.* to 'airflow'@'%';flush privileges;

在mysql安装节点node2上修改”/etc/my.cnf”,在[mysqld]下添加如下内容:

[mysqld]explicit_defaults_for_timestamp=1

以上修改完成“my.cnf”值后,重启Mysql即可,重启之后,可以查询对应的参数是否生效:

#重启mysql[root@node2 ~]# service mysqld restart#重新登录mysql查询mysql> show variables like 'explicit_defaults_for_timestamp';

  • 每台节点配置Airflow airflow.cfg文件

修改AIRFLOW_HOME/airflow.cfg文件,确保所有机器使用同一份配置文件,在node1节点上配置airflow.cfg,配置如下:

[core]dags_folder = /root/airflow/dags#修改时区default_timezone = Asia/Shanghai#配置Executor类型,集群建议配置CeleryExecutorexecutor = CeleryExecutor# 配置数据库sql_alchemy_conn=mysql+mysqldb://airflow:123456@node2:3306/airflow?use_unicode=true&charset=utf8[webserver]#设置时区default_ui_timezone = Asia/Shanghai[celery]#配置Celery broker使用的消息队列broker_url = redis://node4:6379/0#配置Celery broker任务完成后状态更新使用库result_backend = db+mysql://root:123456@node2:3306/airflow

将node1节点配置好的airflow.cfg发送到node2、node3、node4节点上

3. 初始化Airflow

1) 每台节点安装需要的python依赖包

初始化Airflow数据库时需要使用到连接mysql的包,执行如下命令来安装mysql对应的python包。

​
(python37) #  pip install mysqlclient -i Simple Index​

2) 在node1上初始化Airflow 数据库

(python37) [root@node1 airflow]# airflow db init

初始化之后在MySQL airflow库下会生成对应的表。

4. 创建管理员用户信息

在node1节点上执行如下命令,创建操作Airflow的用户信息:

airflow users create \--username airflow \--firstname airflow \--lastname airflow \--role Admin \--email xx@qq.com

执行完成之后,设置密码为“123456”并确认,完成Airflow管理员信息创建。

​​​​​​​5. 配置Scheduler HA

1) 下载failover组件

登录https://github.com/teamclairvoyant/airflow-scheduler-failover-controller下载 airflow-scheduler-failover-controller 第三方组件,将下载好的zip包上传到node1 “/software”目录下。

在node1节点安装unzip,并解压failover组件:

(python37) [root@node1 software]# yum -y install unzip(python37) [root@node1 software]# unzip ./airflow-scheduler-failover-controller-master.zip

2) 使用pip进行安装failover需要的依赖包

需要在node1节点上安装failover需要的依赖包。

(python37) [root@node1 software]# cd /software/airflow-scheduler-failover-controller-master(python37) [root@node1 airflow-scheduler-failover-controller-master]# pip install -e .

3) node1节点初始化failover

(python37) [root@node1 ~]# scheduler_failover_controller initAdding Scheduler Failover configs to Airflow config file...Finished adding Scheduler Failover configs to Airflow config file.Finished Initializing Configurations to allow Scheduler Failover Controller to run. Please update the airflow.cfg with your desired configurations.

注意:初始化airflow时,会向airflow.cfg配置中追加配置,因此需要先安装 airflow 并初始化。

4) 修改airflow.cfg

首先修改node1节点的AIRFLOW_HOME/airflow.cfg

[scheduler_failover]# 配置airflow Master节点,这里配置为node1,node2,两节点需要免密scheduler_nodes_in_cluster = node1,node2#在1088行,特别注意,需要去掉一个分号,不然后期自动重启Scheduler不能正常启动airflow_scheduler_start_command = export AIRFLOW_HOME=/root/airflow;nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &

配置完成后,可以通过以下命令进行验证Airflow Master节点:

(python37) [root@node1 airflow]# scheduler_failover_controller test_connectionTesting Connection for host 'node1'(True, ['Connection Succeeded', ''])Testing Connection for host 'node2'(True, ['Connection Succeeded\n'])

将node1节点配置好的airflow.cfg同步发送到node2、node3、node4节点上:

(python37) [root@node1 ~]# cd /root/airflow/(python37) [root@node1 airflow]# scp airflow.cfg node2:`pwd`(python37) [root@node1 airflow]# scp airflow.cfg node3:`pwd`(python37) [root@node1 airflow]# scp airflow.cfg node4:`pwd`

​​​​​​​6. 启动Airflow集群

1) 在所有节点安装启动Airflow依赖的python包

(python37) [root@node1 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3(python37) [root@node2 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3(python37) [root@node3 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3(python37) [root@node4 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3

2) 在Master1节点(node1)启动相应进程

#默认后台启动可以使用-D ,这里使用-D有时不能正常启动Airflow对应进程airflow webserverairflow scheduler

3) 在Master2节点(node2)启动相应进程

airflow webserver

4) 在Worker1(node3)、Worker2(node4)节点启动Worker

在node3、node4节点启动Worker:

(python37) [root@node3 ~]# airflow celery worker(python37) [root@node4 ~]# airflow celery worker

5) 在node1启动Scheduler HA

(python37) [root@node1 airflow]# nohup scheduler_failover_controller start > /root/airflow/logs/scheduler_failover/scheduler_failover_run.log &

​​​​​​​

至此,Airflow高可用集群搭建完成。

​​​​​​​7. 访问Airflow 集群WebUI

浏览器输入node1:8080,查看Airflow WebUI:

8. 测试Airflow HA

1) 准备shell脚本

Airflow集群所有节点{AIRFLOW_HOME}目录下创建dags目录,准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bashdt=$1echo "==== execute first shell ===="echo "---- first : time is ${dt}"

second_shell.sh

#!/bin/bashdt=$1echo "==== execute second shell ===="echo "---- second : time is ${dt}"

2) 编写airflow python 配置

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperatordefault_args = {'owner':'zhangsan','start_date':datetime(2021, 9, 23),'retries': 1,  # 失败重试次数'retry_delay': timedelta(minutes=5) # 失败重试间隔
}dag = DAG(dag_id = 'execute_shell_sh',default_args=default_args,schedule_interval=timedelta(minutes=1)
)first=BashOperator(task_id='first',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag = dag
)second=BashOperator(task_id='second',#脚本路径建议写绝对路径bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),dag=dag
)first >> second

将以上内容写入execute_shell.py文件,上传到所有Airflow节点{AIRFLOW_HOME}/dags目录下。

3) 重启Airflow,进入Airflow WebUI查看对应的调度

重启Airflow之前首先在node1节点关闭webserver ,Scheduler进程,在node2节点关闭webserver ,Scheduler进程,在node3,node4节点上关闭worker进程。

如果各个进程是后台启动,查看后台进程方式:

(python37) [root@node1 dags]# ps aux |grep webserver(python37) [root@node1 dags]# ps aux |grep scheduler(python37) [root@node2 dags]# ps aux |grep webserver(python37) [root@node2 dags]# ps aux |grep scheduler(python37) [root@node3 ~]# ps aux|grep "celery worker"(python37) [root@node4 ~]# ps aux|grep "celery worker"找到对应的启动命令对应的进程号,进行kill。

重启后进入Airflow WebUI查看任务:

点击“success”任务后,可以看到脚本执行成功日志:

​​​​​​​4) 测试Airflow HA

当我们把node1节点的websever关闭后,可以直接通过node2节点访问airflow webui:

在node1节点上,查找“scheduler”进程并kill,测试scheduler HA 是否生效:

(python37) [root@node1 ~]# ps aux|grep schedulerroot      23744  0.9  3.3 326940 63028 pts/2    S    00:08   0:02 airflow scheduler -- DagFileProcessorManager#kill 掉scheduler进程(python37) [root@node1 ~]# kill -9 23744

访问webserver webui

在node1节点查看scheduler_failover_controller进程日志中有启动schudler动作,注意:这里是先从node1启动,启动不起来再从其他Master 节点启动Schduler。


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/228450.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode算法题解(动态规划)|LeetCode198. 打家劫舍、LeetCode213. 打家劫舍 II、LeetCode337. 打家劫舍 III

一、LeetCode198. 打家劫舍 题目链接:198. 打家劫舍 题目描述: 你是一个专业的小偷,计划偷窃沿街的房屋。每间房内都藏有一定的现金,影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统,如果两间相邻的…

java多线程-扩展知识三:乐观锁与悲观锁

1、悲观锁 悲观锁有点像是一位比较悲观(也可以说是未雨绸缪)的人,总是会假设最坏的情况,避免出现问题。 悲观锁总是假设最坏的情况,认为共享资源每次被访问的时候就会出现问题(比如共享数据被修改),所以每次…

Ansys Lumerical|带 1D-2D 光栅的出瞳扩展器

附件下载 联系工作人员获取附件 此示例显示了设置和模拟出瞳扩展器 (EPE) 的工作流程,EPE 是波导型增强现实 (AR) 设备的重要组成部分。该工作流程将利用 Lumerical 和 Zemax OpticStudio 之间的动态链接功能 。为了…

使用 watch+$nextTick 解决Vue引入组件无法使用问题

问题描述: 很多时候我们都需要使用第三方组件库,比如Element-UI,Swiper 等等。 如果我们想要在这些结构中传入自己从服务器请求中获取的数据就会出现无法显示的问题。 比如我们在下面的Swiper例子中,我们需要new Swiper 才能让…

让@RefreshScope注解来帮助我们实现动态刷新

文章目录 前言举例作用参考文章总结 前言 在实际开发当中我们常常会看到有些类上会加一个注解:RefreshScope,有没有对应的小伙伴去思考过这个东西,这个注解有什么作用?为什么要加?下面我们就来看看这个 RefreshScope …

前端入门(四)Ajax、Promise异步、Axios通信、vue-router路由

文章目录 AjaxAjax特点 Promise 异步编程(缺)Promise基本使用状态 - PromiseState结果 - PromiseResult Axios基本使用 Vue路由 - vue-router单页面Web应用(single page web application,SPA)vue-router基本使用路由使…

Jmeter--如何监控服务器资源

在我们做项目的性能测试时,需要查看相关服务器的资源使用情况;本文以apache-Jmeter-5.5版本为例,使用PerfMon进行服务器资源监控的方案由两部分来实现:ServerAgent部署在被测服务器,负责资源耗用数据的采集&#xff0c…

《深入理解计算机系统》学习笔记 - 第三课 - 位,字节和整型

Lecture 03 Bits,Bytes, and Integer count 位,字节,整型 文章目录 Lecture 03 Bits,Bytes, and Integer count 位,字节,整型运算:加,减,乘,除加法乘法取值范围乘法结果 使用无符号注…

苍穹外卖项目笔记(6)— Redis操作营业状态设置

1 在 Java 中操作 Redis 1.1 Redis 的 Java 客户端 Jedis(官方推荐,且命令语句同 redis 命令)Lettuce(底层基于 Netty 多线程框架实现,性能高效)Spring Data Redis(对 Jedis 和 Lettuce 进行了…

读像火箭科学家一样思考笔记12_实践与测试(下)

1. 舆论的火箭科学 1.1. 如果苹果违反了“即飞即测”原则,那苹果的iPhone就不会问世了 1.1.1. iPhone在其上市前的民意调查中相当失败 1.1.1.1. iPhone不可能获得太大市场份额,不可能。 1.1.1.1.1. 微软前CEO史蒂夫鲍尔默(Steve Ballmer&…

SpringBoot RestTemplate 的使用

一、简介 RestTemplate 在JDK HttpURLConnection、Apache HttpComponents、OkHttp等基础上&#xff0c;封装了更高级别的API&#xff0c;默认依赖JDK HttpURLConnection&#xff0c;连接方式默认长连接。 二、使用 2.1、引入依赖 <dependency><groupId>org.spri…

Python财经股票数据保存表格文件 <雪球网>

嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 环境使用: Python 3.10 解释器 Pycharm 编辑器 &#x1f447; &#x1f447; &#x1f447; 更多精彩机密、教程&#xff0c;尽在下方&#xff0c;赶紧点击了解吧~ python源码、视频教程、插件安装教程、资料我都准备好了&…