Apache Zeppelin结合Apache Airflow使用1

Apache Zeppelin结合Apache Airflow使用1

文章目录

  • Apache Zeppelin结合Apache Airflow使用1
  • 前言
  • 一、安装Airflow
  • 二、使用步骤
    • 1.目标
    • 2.编写DAG
    • 2.加载、执行DAG
  • 总结


前言

之前学了Zeppelin的使用,今天开始结合Airflow串任务。

Apache Airflow和Apache Zeppelin是两个不同的工具,各自用于不同的目的。Airflow用于编排和调度工作流,而Zeppelin是一个交互式数据分析和可视化的笔记本工具。虽然它们有不同的主要用途,但可以结合使用以满足一些复杂的数据处理和分析需求。

下面是一些结合使用Airflow和Zeppelin的方式:

  1. Airflow调度Zeppelin Notebooks:

    • 使用Airflow编写调度任务,以便在特定时间或事件触发时运行Zeppelin笔记本。
    • 在Airflow中使用Zeppelin的REST API或CLI命令来触发Zeppelin笔记本的执行。
  2. 数据流管道:

    • 使用Airflow编排数据处理和转换任务,例如从数据源提取数据、清理和转换数据。
    • 在Zeppelin中创建笔记本,用于进一步的数据分析、可视化和报告生成。
    • Airflow任务完成后,触发Zeppelin笔记本执行以基于最新数据执行分析。
  3. 参数传递:

    • 通过Airflow参数传递,将一些参数值传递给Zeppelin笔记本,以便在不同任务之间共享信息。
    • Zeppelin笔记本可以从Airflow任务中获取参数值,以适应特定的数据分析需求。
  4. 日志和监控:

    • 使用Airflow监控工作流的运行情况,查看任务的日志和执行状态。
    • 在Zeppelin中记录和可视化Airflow工作流的关键指标,以获得更全面的工作流性能洞察。
  5. 整合数据存储:

    • Airflow可以用于从不同数据源中提取数据,然后将数据传递给Zeppelin进行进一步的分析。
    • Zeppelin可以使用Airflow任务生成的数据,进行更深入的数据挖掘和分析。

结合使用Airflow和Zeppelin能够充分发挥它们各自的优势,实现更全面、可控和可视化的数据处理和分析工作流。


一、安装Airflow

安装参考:
https://airflow.apache.org/docs/apache-airflow/stable/start.html

CentOS 7.9安装后启动会报错,还需要配置下sqlite,参考:https://airflow.apache.org/docs/apache-airflow/2.8.0/howto/set-up-database.html#setting-up-a-sqlite-database

[root@slas bin]# airflow standalone
Traceback (most recent call last):File "/root/.pyenv/versions/3.9.10/bin/airflow", line 5, in <module>from airflow.__main__ import mainFile "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/__init__.py", line 52, in <module>from airflow import configuration, settingsFile "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 2326, in <module>conf.validate()File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 718, in validateself._validate_sqlite3_version()File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 824, in _validate_sqlite3_versionraise AirflowConfigException(
airflow.exceptions.AirflowConfigException: error: SQLite C library too old (< 3.15.0). See https://airflow.apache.org/docs/apache-airflow/2.8.0/howto/set-up-database.html#setting-up-a-sqlite-database

二、使用步骤

1.目标

我想做个简单的demo,包括两个节点,实现如图所示功能,读取csv,去重:
在这里插入图片描述
csv文件输入在airflow上实现,去重在zeppelin上实现。

2.编写DAG

先实现extract_data_script.py,做个简单的读取csv指定列数据写入新的csv文件。

import argparse
import pandas as pddef extract_and_write_data(date, input_csv, output_csv, columns_to_extract):# 读取指定列的数据csv_file_path = f"/home/works/datasets/data_{date}.csv"df = pd.read_csv(csv_file_path, usecols=columns_to_extract)# 将数据写入新的 CSV 文件df.to_csv(output_csv, index=False)if __name__ == "__main__":parser = argparse.ArgumentParser()parser.add_argument("--date", type=str, required=True, help="Date parameter passed by Airflow")args = parser.parse_args()# 输出 CSV 文件路径(替换为实际的路径)output_csv_path = "/home/works/output/extracted_data.csv"# 指定要提取的列columns_to_extract = ['column1', 'column2', 'column3']# 调用函数进行数据提取和写入extract_and_write_data(args.date, input_csv_path, output_csv_path, columns_to_extract)

然后在 Zeppelin 中创建一个 Python 笔记本(Notebook),其中包含被 Airflow DAG 调用的代码。加载先前从 output/extracted_data.csv 文件中提取的数据:

%python# 导入必要的库
import pandas as pd# 加载先前从 CSV 文件中提取的数据
csv_file_path = "/home/works/output/extracted_data.csv"
# 读取 CSV 文件
df = pd.read_csv(csv_file_path)# 过滤掉 column1 为空的行
df = df[df['column1'].notnull()]# 去重,以 column2、column3 字段为联合去重依据
deduplicated_df = df.drop_duplicates(subset=["column2", "column3"])# 保存去重后的结果到新的 CSV 文件
deduplicated_df.to_csv("/home/works/output/dd_data.csv", index=False)

将这个 Zeppelin 笔记本保存,并记住笔记本的 ID, Airflow DAG 需要使用这个 ID 来调用 Zeppelin 笔记本。

接下来,用VSCode编写zeppelin_integration.py代码如下,上传到$AIRFLOW_HOME/dags目录下:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedeltadefault_args = {'owner': 'airflow','depends_on_past': False,'start_date': datetime(2024, 1, 1),'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),
}dag = DAG('zeppelin_integration',default_args=default_args,schedule_interval=timedelta(days=1),
)extract_data_task = BashOperator(task_id='extract_data',bash_command='python /home/works/z/extract_data_script.py --date {{ ds }}',dag=dag,
)run_zeppelin_notebook_task = BashOperator(task_id='run_zeppelin_notebook',bash_command='curl -X POST http://zeppelin-server:zeppelin-port/api/notebook/job/notebook-id',dag=dag,
)# 设置关联度
extract_data_task >> run_zeppelin_notebook_task

2.加载、执行DAG

如下命令:

[root@slas dags]# airflow dags trigger zeppelin_integration
[2024-01-19T11:23:54.331+0800] {__init__.py:42} INFO - Loaded API auth backend: airflow.api.auth.backend.session
conf | dag_id               | dag_run_id                        | data_interval_start       | data_interval_end         | end_date | external_trigger | last_scheduling_decision | logical_date              | run_type | start_date | state 
=====+======================+===================================+===========================+===========================+==========+==================+==========================+===========================+==========+============+=======
{}   | zeppelin_integration | manual__2024-01-19T03:23:54+00:00 | 2024-01-18T03:23:54+00:00 | 2024-01-19T03:23:54+00:00 | None     | True             | None                     | 2024-01-19T03:23:54+00:00 | manual   | None       | queued

页面上会增加一个DAG,如图:
在这里插入图片描述
在Actions里点击执行。


总结

以上就是今天要讲的内容,总体来说集成两个工具还是很方便的,期待后面更多的应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/414628.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux修行路】基本指令

目录 推荐 前言 1、重新认识操作系统 1.1 操作系统是什么? 1.2操作系统的作用 1.3 我们在计算机上的所有操作 1.4 Linux操作的特点 2、Linux基本指令 2.1 ls 指令 2.2 pwd 命令 2.3 cd 指令 2.3.1 Linux中的目录结构 2.3.2 绝对路径和相对路径 2.3.3 cd 指令 …

通过manifest清单导入项目到gitlab中

文章目录 说明使用manifest得要求Manifest 格式演示示例 说明 从gitlab 11.2引入此功能。 GitLab 允许根据manifest清单文件&#xff08;如 Android 存储库使用的清单文件&#xff09;导入所需的 Git 存储库。 使用manifest得要求 GitLab 必须对其数据库使用 PostgreSQL。至少…

SpiderFlow爬虫平台漏洞利用分析(CVE-2024-0195)

1. 漏洞介绍 SpiderFlow爬虫平台项目中spider-flow-web\src\main\java\org\spiderflow\controller\FunctionController.java文件的FunctionService.saveFunction函数调用了saveFunction函数&#xff0c;该调用了自定义函数validScript&#xff0c;该函数中用户能够控制 functi…

vue 解决el-table 表体数据发生变化时,未重新渲染问题

效果图父组件中数量改变后总数重新计算 子组件完整代码 <template><el-tableshow-summaryref"multipleTable"v-bind"$props"selection-change"handleSelectionChange"row-click"handleRowClick":summary-method"getSum…

【问题记录】使用命令语句从kaggle中下载数据集

从Kaggle中下载Tusimple数据集 1.服务器环境中安装kaggle 使用命令&#xff1a;pip install kaggle 2.复制下载API 具体命令如下&#xff1a; kaggle datasets download -d manideep1108/tusimple3.配置kaggle.json文件 如果直接使用命令会报错&#xff1a; root:~# kagg…

微电网优化MATLAB:火鹰优化算法(Fire Hawk Optimizer,FHO)求解微电网优化(提供MATLAB代码)

一、火鹰优化算法FHO 火鹰优化算法&#xff08;Fire Hawk Optimizer&#xff0c;FHO&#xff09;由Mahdi Azizi等人于2022年提出&#xff0c;该算法性能高效&#xff0c;思路新颖。 单目标优化&#xff1a;火鹰优化算法&#xff08;Fire Hawk Optimizer&#xff0c;FHO&#…

Python数据分析案例36——基于神经网络的AQI多步预测(空气质量预测)

案例背景 不知道大家发现了没&#xff0c;现在的神经网络做时间序列的预测都是单步预测&#xff0c;即(需要使用X的t-n期到X的t-1期的数据去预测X的t期的数据)&#xff0c;这种预测只能预测一个点&#xff0c;我需要预测X的t1期的数据就没办法了&#xff0c;有的同学说可以把预…

vue3-表单输入绑定

表单输入绑定 获取表单输入的值方式&#xff1a; 手动连接值绑定和更改事件监听器 v-model 指令 &#xff08;常用&#xff09; <script lang"ts" setup> import { ref } from "vue" // 定义个变量接收输入的内容&#xff1a; const text ref(&…

制造业工厂为什么要实施MES系统呢?

MES是生产管理系统&#xff0c;生产管理是通过对生产系统的战略计划、组织、指挥、实施、协调、控制等活动&#xff0c;实现系统的物质变换、产品生产、价值提升的过程。在企业的价值链中&#xff0c;生产经营是企业核心能力的重要组成部分。 实施MES系统的原因 MES系统是中国比…

ONLYOFFICE服务器无法连接,请联系管理员问题解决

1、现象 部署好了nextcloud和onlyoffice后&#xff0c;新建文本文档报错ONLYOFFICE服务器无法连接&#xff0c;请联系管理员。 用快捷键“F12”进入控制台&#xff0c;点开错误提示栏&#xff0c;找到有“api.js“文件&#xff0c;“https://ONLYOFFICED的地址/web-apps/apps/…

【生态适配】亚信安慧AntDB数据库与契约锁完成兼容互认

日前&#xff0c;亚信安慧AntDB数据库与上海亘岩网络科技有限公司&#xff08;简称:契约锁&#xff09;研发的契约锁电子签章产品完成兼容互认。经过双方团队的严格测试&#xff0c;亚信安慧AntDB数据库与契约锁&#xff08;V4&#xff09;完全兼容&#xff0c;整体运行稳定高效…

UE5 播放rtsp监控视频

1. 插件下载 https://github.com/inveta/InVideo https://github.com/inveta/InVideo/releases https://download.csdn.net/download/qq_17523181/88760489?spm1001.2014.3001.5501 插件目前支持5.1 / 5.0 2. 建立C UE5项目 重要&#xff1a;此插件支持C项目&#xff0c;不然不…