解决方案:如何在 Amazon EMR Serverless 上执行纯 SQL 文件?

长久已来,SQL以其简单易用、开发效率高等优势一直是ETL的首选编程语言,在构建数据仓库和数据湖的过程中发挥着不可替代的作用。Hive和Spark SQL也正是立足于这一点,才在今天的大数据生态中牢牢占据着主力位置。在常规的Spark环境中,开发者可以使用spark-sql命令直接执行SQL文件,这是一项看似平平无奇实则非常重要的功能:一方面,这一方式极大地降低了Spark的使用门槛,用户只要会写SQL就可以使用Spark;另一方面,通过命令行驱动SQL文件的执行可以极大简化SQL作业的提交工作,使得作业提交本身被“代码化”,为大规模工程开发和自动化部署提供了便利。

但遗憾的是,Amazon EMR Serverless 未能针对执行SQL文件提供原生支持,用户只能在Scala/Python代码中嵌入SQL语句,这对于倚重纯SQL开发数仓或数据湖的用户来说并不友好。为此,我们专门开发了一组用于读取、解析和执行SQL文件的工具类,借助这组工具类,用户可以在 Amazon EMR Serverless 上直接执行SQL文件,本文将详细介绍一下这一方案。

1. 方案设计

鉴于在Spark编程环境中执行SQL语句的方法是:spark.sql("..."),我们可以设计一个通用的作业类,该类在启动时会根据传入的参数读取指定位置上的SQL文件,然后拆分成单条SQL并调用spark.sql("...")执行。为了让作业类更加灵活和通用,还可以引入通配符一次加载并执行多个SQL文件。此外,ETL作业经常需要根据作业调度工具生成的时间参数去执行相应的批次,这些参数同样会作用到SQL中,所以,作业类还应允许用户在SQL文件中嵌入自定义变量,并在提交作业时以参数形式为自定义变量赋值。基于这种设计思路,我们开发了一个项目,实现了上述功能,项目地址为:

项目名称项目地址
Amazon EMR Serverless Utilitieshttps://github.com/bluishglc/emr-serverless-utils

项目中的com.github.emr.serverless.SparkSqlJob类即为通用的SQL作业类,该类接受两个可选参数,分别是:

参数说明取值示例
–sql-files指定要执行的SQL文件路径,支持Java文件系统通配符,可指定多个文件一起执行s3://my-spark-sql-job/sqls/insert-into-*.sql
–sql-paramsK1=V1,K2=V2,...形式为SQL文件中定义的${K1},${K2},…形式的变量设值CUST_CITY=NEW YORK,ORD_DATE=2008-07-15

该方案具备如下特性:

① 允许单一SQL文件包含多条SQL语句
② 允许在SQL文件中使用${K1},${K2},…的形式定义变量,并在执行作业时使用K1=V1,K2=V2,...形式的参数进行变量赋值
③ 支持Java文件系统通配符,可一次执行多个SQL文件

下面,我们将分别在AWS控制台和命令行两种环境下介绍并演示如何使用该项目的工具类提交纯SQL作业。

2. 实操演示

2.1. 环境准备

在EMR Serverless上提交作业时需要准备一个“EMR Serverless Application”和一个“EMR Serverless Job Execution Role”,其中后者应具有S3和Glue Data Catalog的读写权限。Application可以在EMR Serverless控制台(EMR Studio)上通过向导轻松创建(全默认配置即可),Execution Role可以使用 《CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark》 一文第5节提供的脚本快速创建。

接下来要准备提交作业所需的Jar包和SQL文件。首先在S3上创建一个存储桶,本文使用的桶取名:my-spark-sql-job(当您在自己的环境中操作时请注意替换桶名),然后从 [ 此处 ] 下载编译好的 emr-serverless-utils.jar包并上传至s3://my-spark-sql-job/jars/目录下:

请添加图片描述

在演示过程中还将使用到5个SQL示例文件,从 [ 此处 ] 下载解压后上传至s3://my-spark-sql-job/sqls/目录下:

请添加图片描述

2.2. 在控制台上提交纯SQL文件作业

2.2.1. 执行单一SQL文件

打开EMR Serverless的控制台(EMR Studio),在选定的EMR Serverless Application下提交一个如下的Job:

请添加图片描述

① Script location:设定为此前上传的Jar包路径 s3://my-spark-sql-job/jars/emr-serverless-utils-1.0.jar
② Main class:设定为 com.github.emr.serverless.SparkSqlJob
③ Script arguments:设定为 ["--sql-files","s3://my-spark-sql-job/sqls/drop-tables.sql"]

至于其他选项,无需特别设定,保持默认配置即可,对于在生产环境中部署的作业,您可以结合自身作业的需要灵活配置,例如Spark Driver/Executor的资源分配等。需要提醒的是:通过控制台创建的作业默认会启用Glue Data Catalog(即:Additional settings -> Metastore configuration -> Use AWS Glue Data Catalog 默认是勾选的),为了方便在Glue和Athena中检查SQL脚本的执行结果,建议您不要修改此项默认配置。

上述配置描述了这样一项工作:以s3://my-spark-sql-job/jars/emr-serverless-utils-1.0.jar中的com.github.emr.serverless.SparkSqlJob作为主类,提起一个Spark作业。其中["--sql-files","s3://my-spark-sql-job/sqls/drop-tables.sql"]是传递给SparkSqlJob的参数,用于告知作业所要执行的SQL文件位置。本次作业执行的SQL文件只有三条简单的DROP TABLE语句,是一个基础示例,用以展示工具类执行单一文件内多条SQL语句的能力。

2.2.2. 执行带自定义参数的SQL文件

接下来要演示的是工具类的第二项功能:执行带自定义参数的SQL文件。新建或直接复制上一个作业(在控制台上选定上一个作业,依次点击 Actions -> Clone job),然后将“Script arguments”的值设定为:

["--sql-files","s3://my-spark-sql-job/sqls/create-tables.sql","--sql-params","APP_S3_HOME=s3://my-spark-sql-job"]

如下图所示:

请添加图片描述

这次的作业设定除了使用--sql-files参数指定了SQL文件外,还通过--sql-params参数为SQL中出现的用户自定义变量进行了赋值。根据此前的介绍,APP_S3_HOME=s3://my-spark-sql-job是一个“Key=Value”字符串,其含义是将值s3://my-spark-sql-job赋予了变量APP_S3_HOME,SQL中所有出现${APP_S3_HOME}的地方都将被s3://my-spark-sql-job所替代。查看create-tables.sql文件,在建表语句的LOCATION部分可以发现自定义变量${APP_S3_HOME}

CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS (... ...
)
... ...
LOCATION '${APP_S3_HOME}/data/orders/';

SparkSqlJob读取该SQL文件时,会根据键值对字符串APP_S3_HOME=s3://my-spark-sql-job将SQL文件中所有的${APP_S3_HOME}替换为s3://my-spark-sql-job,实际执行的SQL将变为:

CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS (... ...
)
... ...
LOCATION 's3://my-spark-sql-job/data/orders/';

提交作业并执行完毕后,可登录Athena控制台,查看数据表是否创建成功。

2.2.3. 使用通配符执行多个文件

有时候,我们需要批量执行一个文件夹下的所有SQL文件,或者使用通配符选择性的执行部分SQL文件,SparkSqlJob使用了Java文件系统通配符来支持这类需求。下面的作业就演示了通配符的使用方法,同样是新建或直接复制上一个作业,然后将“Script arguments”的值设定为:

["--sql-files","s3://my-spark-sql-job/sqls/insert-into-*.sql"]

如下图所示:

请添加图片描述

这次作业的--sql-files参数使用了路径通配符,insert-into-*.sql将同时匹配insert-into-orders.sqlinsert-into-customers.sql两个SQL文件,它们将分别向ORDERSCUSTOMERS两张表插入多条记录。执行完毕后,可以可登录Athena控制台,查看数据表中是否有数据产生。

2.2.4. 一个复合示例

最后,我们来提交一个更有代表性的复合示例:文件通配符 + 用户自定义参数。再次新建或直接复制上一个作业,然后将“Script arguments”的值设定为:

["--sql-files","s3://my-spark-sql-job/sqls/select-*.sql","--sql-params","APP_S3_HOME=s3://my-spark-sql-job,CUST_CITY=NEW YORK,ORD_DATE=2008-07-15"]

如下图所示:

![emr-serverless-snapshot-4.jpg-150.8kB][6]

本次作业的--sql-files参数使用路径通配符select-*.sql匹配select-tables.sql文件,该文件中存在三个用户自定义变量,分别是${APP_S3_HOME}${CUST_CITY}${ORD_DATE}

CREATE EXTERNAL TABLE ORDERS_CUSTOMERS... ...LOCATION '${APP_S3_HOME}/data/orders_customers/'
AS SELECT... ...
WHEREC.CUST_CITY = '${CUST_CITY}' ANDO.ORD_DATE = CAST('${ORD_DATE}' AS DATE);

--sql-params参数为这三个自定义变量设置了取值,分别是:APP_S3_HOME=s3://my-spark-sql-jobCUST_CITY=NEW YORKORD_DATE=2008-07-15,于是上述SQL将被转化为如下内容去执行:

CREATE EXTERNAL TABLE ORDERS_CUSTOMERS... ...LOCATION 's3://my-spark-sql-job/data/orders_customers/'
AS SELECT... ...
WHEREC.CUST_CITY = 'NEW YORK' ANDO.ORD_DATE = CAST('2008-07-15' AS DATE);

至此,通过控制台提交纯SQL文件作业的所有功能演示完毕。

2.3. 通过命令行提交纯SQL文件作业

实际上,很多EMR Serverless用户并不在控制台上提交自己的作业,而是通过AWS CLI提交,这种方式方式多见于工程代码或作业调度中。所以,我们再来介绍一下如何通过命令行提交纯SQL文件作业。

本文使用命令行提交EMR Serverless作业的方式遵循了《最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?》一文给出的最佳实践。首先,登录一个安装了AWS CLI并配置有用户凭证的Linux环境(建议使用Amazon Linux2),先使用命令sudo yum -y install jq安装操作json文件的命令行工具:jq(后续脚本会使用到它),然后完成如下前期准备工作:

① 创建或选择一个作业专属工作目录和S3存储桶
② 创建或选择一个EMR Serverless Execution Role
③ 创建或选择一个EMR Serverless Application

接下来将所有环境相关变量悉数导出(请根据您的AWS账号和本地环境替换命令行中的相应值):

export APP_NAME='change-to-your-app-name'
export APP_S3_HOME='change-to-your-app-s3-home'
export APP_LOCAL_HOME='change-to-your-app-local-home'
export EMR_SERVERLESS_APP_ID='change-to-your-application-id'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='change-to-your-execution-role-arn'

以下是一份示例:

export APP_NAME='my-spark-sql-job'
export APP_S3_HOME='s3://my-spark-sql-job'
export APP_LOCAL_HOME='/home/ec2-user/my-spark-sql-job'
export EMR_SERVERLESS_APP_ID='00fbfel40ee59k09'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN'

《最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?》一文提供了多个操作Job的通用脚本,都非常实用,本文也会直接复用这些脚本,但是由于我们需要多次提交且每次的参数又有所不同,为了便于使用和简化行文,我们将原文中的部分脚本封装为一个Shell函数,取名为submit-spark-sql-job

submit-spark-sql-job() {sqlFiles="$1"sqlParams="$2"cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{"name":"my-spark-sql-job","applicationId":"$EMR_SERVERLESS_APP_ID","executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN","jobDriver":{"sparkSubmit":{"entryPoint":"$APP_S3_HOME/jars/emr-serverless-utils-1.0.jar","entryPointArguments":[$([[ -n "$sqlFiles" ]] && echo "\"--sql-files\", \"$sqlFiles\"")$([[ -n "$sqlParams" ]] && echo ",\"--sql-params\", \"$sqlParams\"")],"sparkSubmitParameters":"--class com.github.emr.serverless.SparkSqlJob --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}},"configurationOverrides":{"monitoringConfiguration":{"s3MonitoringConfiguration":{"logUri":"$APP_S3_HOME/logs"}}}
}
EOFjq . $APP_LOCAL_HOME/start-job-run.jsonexport EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \--no-paginate --no-cli-pager --output text \--name my-spark-sql-job \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--execution-timeout-minutes 0 \--cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \--query jobRunId)now=$(date +%s)secwhile true; dojobStatus=$(aws emr-serverless get-job-run \--no-paginate --no-cli-pager --output text \--application-id $EMR_SERVERLESS_APP_ID \--job-run-id $EMR_SERVERLESS_JOB_RUN_ID \--query jobRun.state)if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; thenfor i in {0..5}; doecho -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"sleep 1doneelseprintf "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]%50s\n\n"breakfidone
}

该函数接受两个位置参数:

① 第一位置上的参数用于指定SQL文件路径,其值会传递给SparkSqlJob--sql-files
② 第二位置上的参数用于指定SQL文件中的用户自定义变量,其值会传递给SparkSqlJob--sql-params

函数中使用的Jar包和SQL文件与《2.1. 环境准备》一节准备的Jar包和SQL文件一致,所以使用脚本提交作业前同样需要完成2.1节的环境准备工作。接下来,我们就使用该函数完成与2.2节一样的操作。

2.3.1. 执行单一SQL文件

本节操作与2.2.1节完全一致,只是改用了命令行方式实现,命令如下:

submit-spark-sql-job "$APP_S3_HOME/sqls/drop-tables.sql"

2.3.2. 执行带自定义参数的SQL文件

本节操作与2.2.2节完全一致,只是改用了命令行方式实现,命令如下:

submit-spark-sql-job "$APP_S3_HOME/sqls/create-tables.sql" "APP_S3_HOME=$APP_S3_HOME"

2.3.3. 使用通配符执行多个文件

本节操作与2.2.3节完全一致,只是改用了命令行方式实现,命令如下:

submit-spark-sql-job "$APP_S3_HOME/sqls/insert-into-*.sql"

2.3.4. 一个复合示例

本节操作与2.2.4节完全一致,只是改用了命令行方式实现,命令如下:

submit-spark-sql-job "$APP_S3_HOME/sqls/select-tables.sql" "APP_S3_HOME=$APP_S3_HOME,CUST_CITY=NEW YORK,ORD_DATE=2008-07-15"

3. 在源代码中调用工具类

尽管在Spark编程环境中可以使用spark.sql(...)形式直接执行SQL语句,但是,从前文示例中可以看出 emr-serverless-utils 提供的SQL文件执行能力更便捷也更强大一些,所以,最后我们简单介绍一下如何在源代码中调用相关的工具类获得上述SQL文件的处理能力。具体做法非常简单,你只需要:

① 将emr-serverless-utils-1.0.jar加载到你的类路径中
② 声明隐式类型转换
③ 在spark上直接调用execSqlFile()


# 初始化SparkSession及其他操作
...# 声明隐式类型转换
import com.github.emr.serverless.SparkSqlSupport._# 在spark上直接调用execSqlFile()
spark.execSqlFile("s3://YOUR/XXX.sql")# 在spark上直接调用execSqlFile()
spark.execSqlFile("s3://YOUR/XXX.sql", "K1=V1,K2=V2,...")# 其他操作
...

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/70902.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[oneAPI] Neural Style Transfer

[oneAPI] Neural Style Transfer oneAPINeural Style Transfer特殊环境定义使用包加载数据Neural Style Transfer模型与介绍训练过程结果 比赛&#xff1a;https://marketing.csdn.net/p/f3e44fbfe46c465f4d9d6c23e38e0517 Intel DevCloud for oneAPI&#xff1a;https://devcl…

Kafka 入门到起飞 - 什么是 HW 和 LEO?何时更新HW和LEO呢?

上文我们已经学到&#xff0c; 一个Topic&#xff08;主题&#xff09;会有多个Partition&#xff08;分区&#xff09;为了保证高可用&#xff0c;每个分区有多个Replication&#xff08;副本&#xff09;副本分为Leader 和 Follower 两个角色&#xff0c;Leader副本对外提供读…

做海外游戏推广有哪些条件?

做海外游戏推广需要充分准备和一系列条件的支持。以下是一些关键条件&#xff1a; 市场调研和策略制定&#xff1a;了解目标市场的文化、玩家偏好、竞争格局等是必要的。根据调研结果制定适合的推广策略。 本地化&#xff1a;将游戏内容、界面、语言、货币等进行本地化&#…

SpringBoot复习:(42)WebServerCustomizer的customize方法是在哪里被调用的?

ServletWebServletAutoConfiguration类定义如下&#xff1a; 可以看到其中通过Import注解导入了其内部类BeanPostProcessorRegister。 BeanPostProcessor中定义的registerBeanDefinition方法会被Spring容器调用。 registerBeanDefinitions方法调用了RegistrySyntheticBeanIf…

【C++】面向对象编程引入 ③ ( 面向过程编程的结构化程序设计方法 | 结构化程序设计方法概念 / 特点 / 优缺点 | 面向对象编程引入 )

文章目录 一、面向过程编程的结构化程序设计方法1、结构化程序设计方法概念2、结构化程序设计方法特点3、结构化程序设计方法优缺点 二、面向对象编程引入 一、面向过程编程的结构化程序设计方法 如果使用 面向过程语言 ( 如 : C 语言 ) , 开发 大型 项目 , 一般使用 结构化程序…

什么是字体堆栈(font stack)?如何设置字体堆栈?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 什么是字体堆栈&#xff08;Font Stack&#xff09;&#xff1f;⭐ 如何设置字体堆栈&#xff1f;⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 …

Android Framework 全局替换系统字体

基于Android 11 Android Framework 全局替换系统字体 第一种通过替换系统默认字体 将需要替换的字体资源放置frameworks/base/data/fonts/目录下。 将系统默认的Roboto字体替换为HarmonyOs字体。 frameworks/base/data/fonts/fonts.xml frameworks/base/data/fonts/Android.…

Vue.js 生命周期详解

Vue.js 是一款流行的 JavaScript 框架&#xff0c;它采用了组件化的开发方式&#xff0c;使得前端开发更加简单和高效。在 Vue.js 的开发过程中&#xff0c;了解和理解 Vue 的生命周期非常重要。本文将详细介绍 Vue 生命周期的四个阶段&#xff1a;创建、挂载、更新和销毁。 …

缓存平均的两种算法

引言 线边库存物料的合理性问题是物流仿真中研究的重要问题之一,如果线边库存量过多,则会对生产现场的布局产生负面影响,增加成本,降低效益。 写在前面 仿真分析后对线边Buffer的使用情况进行合理的评估就是一个非常重要的事情。比较关心的参数包括:缓存位最大值…

【NAS群晖drive异地访问】使用cpolar远程访问内网Synology Drive「内网穿透」

文章目录 前言1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用 2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用 3. 结语 前言 群晖作为专业的数据存储中心&…

C语言刷题训练【第11天】

大家好&#xff0c;我是纪宁。 今天是C语言笔试刷题训练的第11天&#xff0c;加油&#xff01; 文章目录 1、声明以下变量&#xff0c;则表达式: ch/i (f*d – i) 的结果类型为&#xff08; &#xff09;2、关于代码的说法正确的是&#xff08; &#xff09;3、已知有如下各变…

Python Web框架:Django、Flask和FastAPI巅峰对决

今天&#xff0c;我们将深入探讨Python Web框架的三巨头&#xff1a;Django、Flask和FastAPI。无论你是Python小白还是老司机&#xff0c;本文都会为你解惑&#xff0c;带你领略这三者的魅力。废话不多说&#xff0c;让我们开始这场终极对比&#xff01; Django&#xff1a;百…