【Spark系列6】如何做SQL查询优化和执行计划分析

Apache Spark SQL 使用 Catalyst 优化器来生成逻辑执行计划和物理执行计划。逻辑执行计划描述了逻辑上如何执行查询,而物理执行计划则是 Spark 实际执行的步骤。

一、查询优化

示例 1:过滤提前

未优化的查询

val salesData = spark.read.parquet("hdfs://sales_data.parquet")
val result = salesData.groupBy("product_id").agg(sum("amount").alias("total_sales")).filter($"total_sales" > 1000)

优化后的查询

val salesData = spark.read.parquet("hdfs://sales_data.parquet")
val filteredData = salesData.filter($"amount" > 1000)
val result = filteredData.groupBy("product_id").agg(sum("amount").alias("total_sales"))

优化解释:通过在聚合之前应用过滤,减少了聚合操作处理的数据量,从而减少了执行时间和资源消耗。

示例 2:使用广播连接

未优化的查询

val largeTable = spark.read.parquet("hdfs://large_table.parquet")
val smallTable = spark.read.parquet("hdfs://small_table.parquet")
val result = largeTable.join(smallTable, Seq("key"))

优化后的查询

import org.apache.spark.sql.functions.broadcastval largeTable = spark.read.parquet("hdfs://large_table.parquet")
val smallTable = spark.read.parquet("hdfs://small_table.parquet")
val result = largeTable.join(broadcast(smallTable), Seq("key"))

优化解释:如果有一个小表和一个大表需要连接,使用广播连接可以将小表的数据发送到每个节点,减少数据传输和shuffle操作,提高查询效率。

示例 3:避免不必要的Shuffle操作

未优化的查询

val transactions = spark.read.parquet("hdfs://transactions.parquet")
val result = transactions.repartition(100, $"country").groupBy("country").agg(sum("amount").alias("total_amount"))

优化后的查询

val transactions = spark.read.parquet("hdfs://transactions.parquet")
val result = transactions.groupBy("country").agg(sum("amount").alias("total_amount"))

优化解释:repartition会导致全局shuffle,而如果后续的操作是按照同一个键进行聚合,这个操作可能是不必要的,因为groupBy操作本身会引入shuffle。

示例 4:处理数据倾斜

未优化的查询

val skewedData = spark.read.parquet("hdfs://skewed_data.parquet")
val referenceData = spark.read.parquet("hdfs://reference_data.parquet")
val result = skewedData.join(referenceData, "key")

优化后的查询

val skewedData = spark.read.parquet("hdfs://skewed_data.parquet")
val referenceData = spark.read.parquet("hdfs://reference_data.parquet")
val saltedSkewedData = skewedData.withColumn("salted_key", concat($"key", lit("_"), (rand() * 10).cast("int")))
val saltedReferenceData = referenceData.withColumn("salted_key", explode(array((0 to 9).map(lit(_)): _*))).withColumn("salted_key", concat($"key", lit("_"), $"salted_key"))
val result = saltedSkewedData.join(saltedReferenceData, "salted_key").drop("salted_key")

优化解释:当存在数据倾斜时,可以通过给键添加随机后缀(称为salting)来分散倾斜的键,然后在连接后去除这个后缀。

示例 5:缓存重用的DataFrame

未优化的查询

val dataset = spark.read.parquet("hdfs://dataset.parquet")
val result1 = dataset.filter($"date" === "2024-01-01").agg(sum("amount"))
val result2 = dataset.filter($"date" === "2024-01-02").agg(sum("amount"))

优化后的查询

val dataset = spark.read.parquet("hdfs://dataset.parquet").cache()
val result1 = dataset.filter($"date" === "2024-01-01").agg(sum("amount"))
val result2 = dataset.filter($"date" === "2024-01-02").agg(sum("amount"))

优化解释:如果同一个数据集被多次读取,可以使用cache()persist()方法将数据集缓存起来,避免重复的读取和计算。

在实际应用中,优化Spark SQL查询通常需要结合数据的具体情况和资源的可用性。通过观察Spark UI上的执行计划和各个stage的详情,可以进一步诊断和优化查询性能。

二、执行计划分析

逻辑执行计划

逻辑执行计划是对 SQL 查询语句的逻辑解释,它描述了执行查询所需执行的操作,但不涉及具体如何在集群上执行这些操作。逻辑执行计划有两个版本:未解析的逻辑计划(unresolved logical plan)和解析的逻辑计划(resolved logical plan)。

举例说明

假设我们有一个简单的查询:

SELECT name, age FROM people WHERE age > 20

在 Spark SQL 中,这个查询的逻辑执行计划可能如下所示:

== Analyzed Logical Plan ==
name: string, age: int
Filter (age#0 > 20)
+- Project [name#1, age#0]+- Relation[age#0,name#1] parquet

这个逻辑计划的组成部分包括:

  • Relation: 表示数据来源,这里是一个 Parquet 文件。
  • Project: 表示选择的字段,这里是nameage
  • Filter: 表示过滤条件,这里是age > 20

物理执行计划

物理执行计划是 Spark 根据逻辑执行计划生成的,它包含了如何在集群上执行这些操作的具体细节。物理执行计划会考虑数据的分区、缓存、硬件资源等因素。

举例说明

对于上面的逻辑执行计划,Spark Catalyst 优化器可能生成以下物理执行计划:

== Physical Plan ==
*(1) Project [name#1, age#0]
+- *(1) Filter (age#0 > 20)+- *(1) ColumnarToRow+- FileScan parquet [age#0,name#1] Batched: true, DataFilters: [(age#0 > 20)], Format: Parquet, Location: InMemoryFileIndex[file:/path/to/people.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:int,name:string>

这个物理执行计划的组成部分包括:

  • FileScan: 表示数据的读取操作,这里是从 Parquet 文件读取。
  • ColumnarToRow: 表示数据格式的转换,因为 Parquet 是列式存储,需要转换为行式以供后续操作。
  • Filter: 表示过滤操作,这里是执行age > 20的过滤条件。
  • Project: 表示字段选择操作,这里是选择nameage字段。

物理执行计划还包含了一些优化信息,例如:

  • Batched: 表示是否批量处理数据,这里是true
  • DataFilters: 实际应用于数据的过滤器。
  • PushedFilters: 表示已推送到数据源的过滤器,这可以减少从数据源读取的数据量。

要查看 Spark SQL 查询的逻辑和物理执行计划,可以在 Spark 代码中使用.explain(true)方法:

val df = spark.sql("SELECT name, age FROM people WHERE age > 20")
df.explain(true)

这将输出上述的逻辑和物理执行计划信息,帮助开发者理解和优化查询。

给一个实际业务执行过程中的SQL计划参考:

执行计划:

== Parsed Logical Plan ==
Aggregate [count(1) AS count#93L]
+- Aggregate [pos_id#0, tag_id#1, pos_tag_id#2L], [pos_id#0, tag_id#1, pos_tag_id#2L]+- Filter (isnotnull(pos_id#0) && isnotnull(tag_id#1))+- SubqueryAlias `pos_tag_dim_row`+- Relation[pos_id#0,tag_id#1,pos_tag_id#2L,click_date#3,user_type#4,ad_division_id_new#5,delivery_system_type#6,campaign_type#7,ad_sku_first_cate_cd#8,automated_bidding_type#9,ad_type_cd#10,period#11,medium_company_id#12L,medium_traffic_id#13L,media_tag_created_date#14,os_type#15,bundle_name#16,jd_request_cnt#17L,filter_requests#18L,filter_requests_after#19L,valid_requests#20L,media_return_cnt#21L,impressions#22L,queries#23L,... 19 more fields] parquet== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#93L]
+- Aggregate [pos_id#0, tag_id#1, pos_tag_id#2L], [pos_id#0, tag_id#1, pos_tag_id#2L]+- Filter (isnotnull(pos_id#0) && isnotnull(tag_id#1))+- SubqueryAlias `pos_tag_dim_row`+- Relation[pos_id#0,tag_id#1,pos_tag_id#2L,click_date#3,user_type#4,ad_division_id_new#5,delivery_system_type#6,campaign_type#7,ad_sku_first_cate_cd#8,automated_bidding_type#9,ad_type_cd#10,period#11,medium_company_id#12L,medium_traffic_id#13L,media_tag_created_date#14,os_type#15,bundle_name#16,jd_request_cnt#17L,filter_requests#18L,filter_requests_after#19L,valid_requests#20L,media_return_cnt#21L,impressions#22L,queries#23L,... 19 more fields] parquet== Optimized Logical Plan ==
Aggregate [count(1) AS count#93L]
+- Aggregate [pos_id#0, tag_id#1, pos_tag_id#2L]+- Project [pos_id#0, tag_id#1, pos_tag_id#2L]+- Filter (isnotnull(pos_id#0) && isnotnull(tag_id#1))+- Relation[pos_id#0,tag_id#1,pos_tag_id#2L,click_date#3,user_type#4,ad_division_id_new#5,delivery_system_type#6,campaign_type#7,ad_sku_first_cate_cd#8,automated_bidding_type#9,ad_type_cd#10,period#11,medium_company_id#12L,medium_traffic_id#13L,media_tag_created_date#14,os_type#15,bundle_name#16,jd_request_cnt#17L,filter_requests#18L,filter_requests_after#19L,valid_requests#20L,media_return_cnt#21L,impressions#22L,queries#23L,... 19 more fields] parquet== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)], output=[count#93L])
+- Exchange SinglePartition+- *(2) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#96L])+- *(2) HashAggregate(keys=[pos_id#0, tag_id#1, pos_tag_id#2L], functions=[], output=[])+- Exchange hashpartitioning(pos_id#0, tag_id#1, pos_tag_id#2L, 100)+- *(1) HashAggregate(keys=[pos_id#0, tag_id#1, pos_tag_id#2L], functions=[], output=[pos_id#0, tag_id#1, pos_tag_id#2L])+- *(1) Project [pos_id#0, tag_id#1, pos_tag_id#2L]+- *(1) Filter (isnotnull(pos_id#0) && isnotnull(tag_id#1))+- *(1) FileScan parquet [pos_id#0,tag_id#1,pos_tag_id#2L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ns1017/user/jd_ad/ads_report/etl/offline.spark/tmc_base_report_daily/out..., PartitionFilters: [], PushedFilters: [IsNotNull(pos_id), IsNotNull(tag_id)], ReadSchema: struct<pos_id:string,tag_id:string,pos_tag_id:bigint>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/449745.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TCP TIME_WAIT 过多怎么处理

文章目录 1.什么是 TCP TIME_WAIT&#xff1f;2.为什么要 TIME_WAIT?3.TIME_WAIT 过多的影响4.解决办法4.1 调整短连接为长连接4.2 调整系统内核参数 5.小结参考文献 1.什么是 TCP TIME_WAIT&#xff1f; TCP 断开连接四次挥手过程中&#xff0c;主动断开连接的一方&#xff…

Windows Server 2019 Web服务器搭建

系列文章目录 目录 系列文章目录 文章目录 前言 二、配置服务器 1.实验环境搭建 1)实验服务器配置和客户端 2)实验环境 3)配置服务器IP 2.搭建服务器 在网站上右击新建网站搭建成功 文章目录 Windows Server 2003 Web服务器搭建Windows Server 2003 FTP服务器搭建Wi…

Linux下tar命令详解

tar #归档命令 格式 • Tar -参数 [args]..... 参数&#xff1a; 必选参数&#xff1a; 辅助参数&#xff1a; 额外参数&#xff1a; # 打包时排除某个文件 tar cf 文件名.tar --exclude路径/文件 路径 注&#xff1a;此处的路径前后需要保持保持一致&#xff0c;统一…

在本地电脑上打开服务器里面的localhost网址

远程连接服务器&#xff0c;启动了一个服务 显示访问地址为&#xff1a;http://127.0.0.1:7860 在本地浏览器将127.0.0.1改成服务器ip但是无法访问 解决办法&#xff1a; 1. ssh新建一个远程连接&#xff0c;将服务器的7860端口重定向到本机 ssh -L 18097:127.0.0.1:7860 us…

【Java 数据结构】排序

排序算法 1. 排序的概念及引用1.1 排序的概念1.2 常见的排序算法 2. 常见排序算法的实现2.1 插入排序2.1.1 直接插入排序2.1.2 希尔排序( 缩小增量排序 ) 2.2 选择排序2.2.1 直接选择排序2.2.2 堆排序 2.3 交换排序2.3.1冒泡排序2.3.2 快速排序2.3.3 快速排序非递归 2.4 归并排…

cmd常用命令

一、启动cmd的方式 用户启动&#xff0c;Win r 输入cmd&#xff0c;Enter管理员启动&#xff0c;Win r 输入cmd&#xff0c;Ctrl Shift Enter 二、修改窗口背景色和文字颜色 在打开的cmd窗口顶部空白区右击点击属性&#xff0c;进行设置即可 设置成这样不是美汁汁嘛 三、…

idea 中 tomcat 乱码问题修复

之前是修改 Tomcat 目录下 conf/logging.properties 的配置&#xff0c;将 UTF-8 修改为 GBK&#xff0c;现在发现不用这样修改了。只需要修改 IDEA 中 Tomcat 的配置就可以了。 修改IDEA中Tomcat的配置&#xff1a;添加-Dfile.encodingUTF-8 本文结束

【机器学习】科学库使用手册第2篇:机器学习任务和工作流程(已分享,附代码)

本系列文章md笔记&#xff08;已分享&#xff09;主要讨论人工智能相关知识。主要内容包括&#xff0c;了解机器学习定义以及应用场景&#xff0c;掌握机器学习基础环境的安装和使用&#xff0c;掌握利用常用的科学计算库对数据进行展示、分析&#xff0c;学会使用jupyter note…

【Apollo】阿波罗自动驾驶系统:驶向未来的智能出行(含源码安装)

前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家&#xff1a;https://www.captainbed.cn/z ChatGPT体验地址 文章目录 前言驶向未来的智能出行 步骤一&#xff1a;安装 Linux 系统&#xff08;可选&a…

三款免费的.NET混淆工具推荐

ConfuserEx ConfuserEx是一个功能强大且广泛使用的.NET代码混淆工具。它支持多种混淆技术&#xff0c;包括控制流混淆、字符串加密、资源加密等。它具有灵活的配置选项&#xff0c;可以根据不同的需求进行定制&#xff08;不足的是目前只支持.NET Framework 2.0/3.0/3.5/4.0/4.…

freeswitch的主被叫号码

概述 freeswitch是一款简单好用的VOIP开源软交换平台。 sip信令中对于主被叫号码有多个头域显示&#xff0c;不同的配置参数又有多种头域组合&#xff0c;当我们在使用fs处理信令时&#xff0c;应该如何设置和获取主被叫号码在恰当的头域中。 环境 centos&#xff1a;CentO…

新型生成式 AI 助手 Amazon Q(预览版)上线

今天&#xff0c;我们宣布推出 Amazon Q&#xff0c;这是一种新型的生成式人工智能助手&#xff0c;专门用于满足办公场景需要&#xff0c;可以根据客户业务进行定制。客户可以使用 Amazon Q 进行对话、解决问题、生成内容、获取见解并采取行动&#xff0c;所有这些都基于客户自…