Spark 应用程序优化和调优总结

文章目录

    • 前言
    • 调整 Spark 默认配置
      • 查看和设置 Spark 配置信息
      • 动态扩展集群负载
    • 数据的缓存和持久化
      • DataFrame.cache()
      • DataFrame.persist()
      • 何时缓存和持久化
      • 何时不缓存和持久化
    • Spark 中的 JOINs
      • 广播连接
      • 排序合并连接
    • 总结

前言

本文总结了 Spark 中比较重要和常用的调优手段,包括设置并优化 Spark 程序的默认配置,来改进大型任务的工作负载和并行度,从而减少 Spark executor 内存不足的问题。以及如何使用适当的缓存和持久化策略来增加对常用数据集的访问速度。还有说明了在操作复杂聚合时常用的两种连接方式,以及如何设置合理的排序键来进行分桶,尽量减少 shuffle 操作等优化手段。

调整 Spark 默认配置

Spark 的官方的配置官方的配置内容很多,以及对应的官方配置调优建议也很多,这里只说明部分常见和重要的调优配置策略。

查看和设置 Spark 配置信息

有三种获取当前 Spark 集群的配置信息,第一种方式是在$SPARK_HOME目录下查看对应配置文件:
_conf/spark-defaults.conf.template_、_conf/log4j.properties.template_和_conf/spark-env.sh.template_

注意:这将修改整个集群的配置,需要小心

第二种方式是在通过 spark-submit 提交 Spark 应用程序本身时指定配置参数,该方法不会影响整个集群。

spark-submit --conf spark.sql.shuffle.partitions=5 --conf
"spark.executor.memory=2g" --class main.scala.chapter7.SparkConfig_7_1 jars/main-
scala-chapter7_2.12-1.0.jar

或者是在程序中指定配置:

// In Scala
import org.apache.spark.sql.SparkSessiondef printConfigs(session: SparkSession) = {// Get confval mconf = session.conf.getAll// Print themfor (k <- mconf.keySet) { println(s"${k} -> ${mconf(k)}\n") }
}def main(args: Array[String]) {// Create a sessionval spark = SparkSession.builder.config("spark.sql.shuffle.partitions", 5).config("spark.executor.memory", "2g").master("local[*]").appName("SparkConfig").getOrCreate()printConfigs(spark)spark.conf.set("spark.sql.shuffle.partitions",spark.sparkContext.defaultParallelism)println(" ****** Setting Shuffle Partitions to Default Parallelism")printConfigs(spark)
}spark.driver.host -> 10.8.154.34
spark.driver.port -> 55243
spark.app.name -> SparkConfig
spark.executor.id -> driver
spark.master -> local[*]
spark.executor.memory -> 2g
spark.app.id -> local-1580162894307
spark.sql.shuffle.partitions -> 5

第三种是在 shell 交互中查看并设置配置信息:

// In Scala
// mconf is a Map[String, String] 
scala> val mconf = spark.conf.getAll
...
scala> for (k <- mconf.keySet) { println(s"${k} -> ${mconf(k)}\n") }spark.driver.host -> 10.13.200.101
spark.driver.port -> 65204
spark.repl.class.uri -> spark://10.13.200.101:65204/classes
spark.jars ->
spark.repl.class.outputDir -> /private/var/folders/jz/qg062ynx5v39wwmfxmph5nn...
spark.app.name -> Spark shell
spark.submit.pyFiles ->
spark.ui.showConsoleProgress -> true
spark.executor.id -> driver
spark.submit.deployMode -> client
spark.master -> local[*]
spark.home -> /Users/julesdamji/spark/spark-3.0.0-preview2-bin-hadoop2.7
spark.sql.catalogImplementation -> hive
spark.app.id -> local-1580144503745

还可以通过 Spark SQL 查询:

# In Python
spark.sql("SET -v").select("key", "value").show(n=5, truncate=False)+------------------------------------------------------------+-----------+
|key                                                         |value      |
+------------------------------------------------------------+-----------+
|spark.sql.adaptive.enabled                                  |false      |
|spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin   |0.2        |
|spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled|true       |
|spark.sql.adaptive.shuffle.localShuffleReader.enabled       |true       |
|spark.sql.adaptive.shuffle.maxNumPostShufflePartitions      |<undefined>|
+------------------------------------------------------------+-----------+
only showing top 5 rows

或者在 web 界面查看:
image.png

动态扩展集群负载

静态与动态资源分配
如果在提交任务的时候通过配置限定了使用的资源,集群有时候会因为任务所需资源大于预期限定,导致任务在队列中排队,会导致任务挤压,导致后续需要更多的资源运行任务。
如果说在配置中指定了动态资源分配配置,那么Spark 会根据任务来计算所需资源,动态分配,这有利于大量任务峰值的时候。
动态分配资源配置:

# 开启动态分配
spark.dynamicAllocation.enabled true
# driver 会请求集群最少创建两个 executor
spark.dynamicAllocation.minExecutors 2
# 任务队列中的任务积压超过 1m driver 就会请求 executor 执行该任务
spark.dynamicAllocation.schedulerBacklogTimeout 1m
# driver 最多请求 20 个 executor 来执行积压的任务
spark.dynamicAllocation.maxExecutors 20
# 如果 executor 执行完积压任务并且 2m 内没有新的积压任务就终止该 executor
spark.dynamicAllocation.executorIdleTimeout 2min

配置 Spark executor 的内存和 shuffle 服务
仅仅动态分配资源是不够的,我们还需要知道 Spark 是如何分配和使用内存的,以便程序不收 JVM 垃圾回收的影响。
每个 executor 的内存分为三部分:

  • 执行内存:去除保留内存后,默认分配剩下的60%,执行内存用于 shuffles, joins, sorts, 和 aggregations 操作。
  • 存储内存:去除保留内存后,默认分配剩下的40%,存储内存主要保存 DataFrame 生成的数据结构和 partitions。
  • 预留内存:默认保留 300M,防止 OOM

image.png
Spark 的默认内存分配适用于大部分情况,一般无需修改,但是如果作业存在大量 map 和 shuffle,Spark 会读取本地磁盘的 shuffle 文件 ,如果内存不足,会存在大量的 I/O 操作,造成瓶颈。这个时候默认配置就不再是最佳的配置了,需要根据具体的情况调整。
下边是建议调整的一些配置参数,但是要根据实际环境不断调整至最佳:

配置默认值、建议和说明
spark.driver.memory默认值为1g(1 GB)。这是分配给 Spark driver 用于从executor 接收数据的内存量。可以在提交任务时通过–driver-memory 指定。
仅当希望executor 从该操作接收大量数据collect(),或者内executor 内存不足时,才更改此设置。
spark.shuffle.file.buffer默认值为 32 KB。建议为 1 MB。这允许 Spark 在将最终映射结果写入磁盘之前进行更多缓冲。
spark.file.transferTo默认为true.设置为false将强制 Spark 在最终写入磁盘之前使用文件缓冲区来传输文件;这将减少 I/O 活动。
spark.shuffle.unsafe.file.output.buffer默认值为 32 KB。指定shuffle 期间合并文件用到的内存大小。一般来说,较大的值(例如 1 MB)更适合较大的工作负载,而默认值则适用于较小的工作负载。
spark.io.compression.lz4.blockSize默认值为 32 KB。增加到 512 KB。可以通过增加块的压缩大小来减小shuffle 文件的大小。
spark.shuffle.service.index.cache.size默认值为 100m。指定shuffle 的最大内存。
spark.shuffle.registration.timeout默认值为 5000 毫秒。增加到 120000 毫秒。
spark.shuffle.registration.maxAttempts默认值为 3。如果需要,可增加到 5。

最大化 Spark 并行度
对于大的任务,Spark 会将任务拆分为多个 stage,每个 stage 内都会有多个任务。Spark 最多会为每个任务分配一个线程,去处理不同分区的数据。
为了充分利用资源,就最好保证分区数量最少和 executor 上的 core 数量一致,理想情况是一样多,这样既不会保证资源浪费,有保证每个任务都会执行。
image.png

分区是如何创建的
如前所述,Spark 的任务将数据处理为分区从磁盘读入内存。磁盘上的数据以块或连续文件块的形式排列,具体取决于存储。默认情况下,数据存储上的文件块的大小范围为 64 MB 到 128 MB。例如,在 HDFS 和 S3 上,默认大小为 128 MB(这是可配置的)。这些块的连续集合构成一个分区。
可以通过配置spark.sql.files.maxPartitionBytes来减小分区大小,但是可能会随着分区大小的减小,导致过多小文件生成的问题——许多小分区文件,由于打开、关闭和列出等文件系统操作而引入过多的磁盘 I/O 和性能下降目录,在分布式文件系统上可能会很慢。
程序中指定分区数量:

// In Scala
val ds = spark.read.textFile("../README.md").repartition(16)
ds: org.apache.spark.sql.Dataset[String] = [value: string]ds.rdd.getNumPartitions
res5: Int = 16val numDF = spark.range(1000L * 1000 * 1000).repartition(16)
numDF.rdd.getNumPartitionsnumDF: org.apache.spark.sql.Dataset[Long] = [id: bigint]
res12: Int = 16

最后通过配置spark.sql.shuffle.partitions指定 shuffle 分区的数量,默认情况下是200。

说明:当任务数据量下,或者 executor 上的 core 数量少,默认值可能太大,可以适当调小

shuffle 是在 groupBy()或者 join()宽转换操时发生的,shuffle 会将内存中的数据持久化至本地磁盘,它会消耗网络和磁盘 I/O 资源。

数据的缓存和持久化

对于使用频率高的 DataFrame 和表,将其缓存,有利于提高任务运行效率。

DataFrame.cache()

// In Scala
// Create a DataFrame with 10M records
val df = spark.range(1 * 10000000).toDF("id").withColumn("square", $"id" * $"id")
df.cache() // Cache the data
df.count() // Materialize the cacheres3: Long = 10000000
Command took 5.11 secondsdf.count() // Now get it from the cache
res4: Long = 10000000
Command took 0.44 seconds

第一个count()实现缓存,而第二个访问缓存,导致该数据集的访问时间快了近 12 倍。

注意:
在使用cache() 或者 persist()时,不会完全缓存整个 DataFrame,只会缓存使用到的记录,比如 take(1), 则会缓存一个分区。

DataFrame.persist()

// In Scala
import org.apache.spark.storage.StorageLevel// Create a DataFrame with 10M records
val df = spark.range(1 * 10000000).toDF("id").withColumn("square", $"id" * $"id")
df.persist(StorageLevel.DISK_ONLY) // Serialize the data and cache it on disk
df.count() // Materialize the cacheres2: Long = 10000000
Command took 2.08 secondsdf.count() // Now get it from the cache 
res3: Long = 10000000
Command took 0.38 seconds

取消持久化只需调用Dataframe.unpersist()即可。
我们可以从缓存的 Dataframe 创建缓存表:

// In Scala
df.createOrReplaceTempView("dfTable")
spark.sql("CACHE TABLE dfTable")
spark.sql("SELECT count(*) FROM dfTable").show()+--------+
|count(1)|
+--------+
|10000000|
+--------+Command took 0.56 seconds

何时缓存和持久化

缓存的常见用例是需要重复访问大型数据集以进行查询或转换的场景,比如:

  • 迭代机器学习训练期间常用的 DataFrame
  • 在 ETL 或构建数据管道期间进行频繁转换时访问的 DataFrame

何时不缓存和持久化

并非所有用例都规定需要缓存:

  • DataFrame 太大而无法放入内存
  • 转换开销小,不关心大小,不会频繁使用的 DataFrame

Spark 中的 JOINs

与关系数据库类似,Spark DataFrame 和 Dataset API 以及 Spark SQL 提供了一系列连接转换:内连接、外连接、左连接、右连接等。所有这些操作都会触发 Spark executor 之间的大量数据移动。
这些转换(也叫 shuffle) 的核心是 Spark 需要根据 groupBy()、join()、agg()、sortBy() 和 reduceByKey() 等操作计算要生成哪些数据、要写入磁盘的键和关联的数据,以及如何将这些键和数据传输到对应的节点。
Spark 有五种不同的连接策略,通过它们在 executor 之间交换_、_移动、排序、分组和合并数据:广播哈希连接 (BHJ)、随机哈希连接 (SHJ)、随机排序合并连接 (SMJ)、广播嵌套循环连接(BNLJ),以及随机和复制嵌套循环连接(又名笛卡尔积连接)。我们在这里只关注其中的两个(BHJ 和 SMJ),因为它们是遇到的最常见的。

广播连接

当有两个数据集需要连接时,采用广播连接,会将较小的一个广播至所有 executor,然后与较大的数据集连接,这种策略避免了大规模的交换。
image.png
默认情况下,如果较小的数据集小于 10 MB,Spark 将使用广播连接。此配置可以在spark.sql.autoBroadcastJoinThreshold设置,可以根据每个executor 和 driver 中的内存量来减少或增加大小。如果确信有足够的内存,则可以对大于 10 MB(甚至高达 100 MB)的 DataFrame 使用广播连接。
BHJ 是 Spark 提供的最简单、最快的连接,因为它不涉及数据集的任何shuffle;广播后,executor 可以在本地获取所有数据。只需确保 Spark driver 和 executor 端都有足够的内存来在内存中保存较小的数据集。

何时使用广播连接

  • 当较小和较大数据集中的每个键被 Spark 散列到同一分区时
  • 当一个数据集比另一个数据集小得多时(并且在 10 MB 的默认配置内,如果您有足够的内存,则可以更大)
  • 在执行 eq-join 时,根据匹配的未排序键组合两个数据集
  • 不担心网络带宽使用过多或OOM错误时,因为较小的数据集将广播到所有executor

排序合并连接

此连接方案有两个阶段:排序阶段,然后是合并阶段。
排序阶段根据所需的连接键对每个数据集进行排序;合并阶段迭代每个数据集中行中的每个键,如果两个键匹配,则合并行。
下边是将两各大数据集通过公共键 uid == users_id合并的代码:

// In Scala
import scala.util.Random
// Show preference over other joins for large data sets
// Disable broadcast join
// Generate data
...
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")// Generate some sample data for two data sets
var states = scala.collection.mutable.Map[Int, String]()
var items = scala.collection.mutable.Map[Int, String]()
val rnd = new scala.util.Random(42)// Initialize states and items purchased
states += (0 -> "AZ", 1 -> "CO", 2-> "CA", 3-> "TX", 4 -> "NY", 5-> "MI")
items += (0 -> "SKU-0", 1 -> "SKU-1", 2-> "SKU-2", 3-> "SKU-3", 4 -> "SKU-4", 5-> "SKU-5")// Create DataFrames
val usersDF = (0 to 1000000).map(id => (id, s"user_${id}",s"user_${id}@databricks.com", states(rnd.nextInt(5)))).toDF("uid", "login", "email", "user_state")
val ordersDF = (0 to 1000000).map(r => (r, r, rnd.nextInt(10000), 10 * r* 0.2d,states(rnd.nextInt(5)), items(rnd.nextInt(5)))).toDF("transaction_id", "quantity", "users_id", "amount", "state", "items")// Do the join 
val usersOrdersDF = ordersDF.join(usersDF, $"users_id" === $"uid")// Show the joined results
usersOrdersDF.show(false)+--------------+--------+--------+--------+-----+-----+---+---+----------+
|transaction_id|quantity|users_id|amount  |state|items|uid|...|user_state|
+--------------+--------+--------+--------+-----+-----+---+---+----------+
|3916          |3916    |148     |7832.0  |CA   |SKU-1|148|...|CO        |
|36384         |36384   |148     |72768.0 |NY   |SKU-2|148|...|CO        |
|41839         |41839   |148     |83678.0 |CA   |SKU-3|148|...|CO        |
|48212         |48212   |148     |96424.0 |CA   |SKU-4|148|...|CO        |
|48484         |48484   |148     |96968.0 |TX   |SKU-3|148|...|CO        |
|50514         |50514   |148     |101028.0|CO   |SKU-0|148|...|CO        |
|65694         |65694   |148     |131388.0|TX   |SKU-4|148|...|CO        |
|65723         |65723   |148     |131446.0|CA   |SKU-1|148|...|CO        |
|93125         |93125   |148     |186250.0|NY   |SKU-3|148|...|CO        |
|107097        |107097  |148     |214194.0|TX   |SKU-2|148|...|CO        |
|111297        |111297  |148     |222594.0|AZ   |SKU-3|148|...|CO        |
|117195        |117195  |148     |234390.0|TX   |SKU-4|148|...|CO        |
|253407        |253407  |148     |506814.0|NY   |SKU-4|148|...|CO        |
|267180        |267180  |148     |534360.0|AZ   |SKU-0|148|...|CO        |
|283187        |283187  |148     |566374.0|AZ   |SKU-3|148|...|CO        |
|289245        |289245  |148     |578490.0|AZ   |SKU-0|148|...|CO        |
|314077        |314077  |148     |628154.0|CO   |SKU-3|148|...|CO        |
|322170        |322170  |148     |644340.0|TX   |SKU-3|148|...|CO        |
|344627        |344627  |148     |689254.0|NY   |SKU-3|148|...|CO        |
|345611        |345611  |148     |691222.0|TX   |SKU-3|148|...|CO        |
+--------------+--------+--------+--------+-----+-----+---+---+----------+
only showing top 20 rows

查看执行计划:

usersOrdersDF.explain() == Physical Plan ==
InMemoryTableScan [transaction_id#40, quantity#41, users_id#42, amount#43,
state#44, items#45, uid#13, login#14, email#15, user_state#16]+- InMemoryRelation [transaction_id#40, quantity#41, users_id#42, amount#43,
state#44, items#45, uid#13, login#14, email#15, user_state#16], 
StorageLevel(disk, memory, deserialized, 1 replicas)+- *(3) SortMergeJoin [users_id#42], [uid#13], Inner:- *(1) Sort [users_id#42 ASC NULLS FIRST], false, 0:  +- Exchange hashpartitioning(users_id#42, 16), true, [id=#56]:     +- LocalTableScan [transaction_id#40, quantity#41, users_id#42,
amount#43, state#44, items#45]+- *(2) Sort [uid#13 ASC NULLS FIRST], false, 0+- Exchange hashpartitioning(uid#13, 16), true, [id=#57]+- LocalTableScan [uid#13, login#14, email#15, user_state#16]

发现发生了 Exchange操作,也就是 shuffle。
通过 UI 也可以查看到:
image.png
优化排序合并连接
如果我们为公共排序键或要执行频繁等连接的列创建分区存储桶则可以消除 Exchange.
可以创建明确数量的桶来存储特定的排序列(每个桶一个键)。以这种方式对数据进行预排序和重新组织可以提高性能,因为它允许我们跳过昂贵的Exchange操作并直接进入WholeStageCodegen。

何时使用排序合并连接
在以下条件下使用此类连接以获得最大收益:

  • 当两个大数据集中的每个键都可以通过排序并哈希到同一分区时
  • 当只想执行等连接以根据匹配的排序键组合两个数据集时
  • 当能够预防Exchange和Sort导致大量 shuffle 操作时

总结

本文我们讨论了许多用于调整 Spark 应用程序的优化技术。通过调整一些默认的 Spark 配置,可以改进大型任务的扩展、增强并行度,并减少 executor 内存不足的问题。还了解了如何使用具有适当级别的缓存和持久化策略来加快对常用数据集的访问,并且我们研究了 Spark 在复杂聚合期间使用的两种常用联接,以及如何跳过 shuffle 等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/619406.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RedisTemplate

3.3.RedisTemplate 在Sentinel集群监管下的Redis主从集群&#xff0c;其节点会因为自动故障转移而发生变化&#xff0c;Redis的客户端必须感知这种变化&#xff0c;及时更新连接信息。Spring的RedisTemplate底层利用lettuce实现了节点的感知和自动切换。 下面&#xff0c;我们…

一二三应用开发平台使用手册——系统管理-用户-使用说明

概述 对于新系统&#xff0c;如果说初始化组织机构是第一步工作&#xff0c;那么初始化用户则是第二步工作。今天来介绍系统管理模块中的用户使用说明。 对于企业应用&#xff0c;人员是重要的基础数据。 在不少平台和系统中&#xff0c;对“人”的概念进一步细分&#xff0c;…

个人网站制作 Part 19 添加在线聊天支持 | Web开发项目

文章目录 &#x1f469;‍&#x1f4bb; 基础Web开发练手项目系列&#xff1a;个人网站制作&#x1f680; 添加在线聊天支持&#x1f528;使用在线聊天工具&#x1f527;步骤 1: 选择在线聊天工具&#x1f527;步骤 2: 注册Tawk.to账户&#x1f527;步骤 3: 获取嵌入代码 &…

LeetCode 53. 最大子序和

解题思路 相关代码 class Solution {public int maxSubArray(int[] nums) {//f[i]是以nums[i]结尾的连续子数组的最大和。int f[] new int[100010];f[0] nums[0];int resnums[0];for(int i1;i<nums.length;i){f[i] Math.max(f[i-1]nums[i],nums[i]);res Math.max(res,f…

SQLite---调试提示(十九)

返回&#xff1a;SQLite—系列文章目录 上一篇:SQLite Android 绑定&#xff08;十八&#xff09; 下一篇&#xff1a;从 SQLite 3.4.2 迁移到 3.5.0&#xff08;二十&#xff09; ​ 以下是 SQLite 开发人员跟踪、检查和了解 核心 SQLite 库。 这些技术旨在帮助理解 核…

【前端工程化指南】什么是版本控制系统?

什么是版本控制系统 想必大家在多人开发时一定会遇到这样的问题&#xff1a; 每次集中合并大家的代码都要通过U盘、网盘等各类传输工具集中代码&#xff0c;非常麻烦。在多人同时修改同一文件或相同部分代码时&#xff0c;可能会产生冲突&#xff0c;开发人员需要手动比较代码…

内网渗透-Earthworm的简单使用(内网穿透工具)

Earthworm的简单介绍&#xff08;一&#xff09; 文章目录 EarthWorm下载地址1. 普通网络 1.1 跳板机存在公网IP 1.1.1 网络环境1.1.2 使用方法1.1.3 流量走向 1.2 跳板机不存在公网IP&#xff0c;可出网 1.2.1 网络环境1.2.2 使用方法1.2.3 流量走向 2. 二级网络 2.1 一级跳…

「51媒体」中小初创企业如何做好媒体宣传?

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 中小初创企业在做媒体宣传时&#xff0c;由于通常资源有限&#xff0c;需要更加精明地使用外部资源来提升品牌知名度和业务成长。利用专业的媒体服务商可以是一个非常有效的方法。 明确目…

C/C++基础----内存相关

malloc分配内存 用法 参数为要开辟内存的大小&#xff08;字节为单位&#xff09;返回值为void*,所以要强转一下语法&#xff1a;malloc()动态开辟20个字节的内存&#xff0c;代码&#xff1a;#include <iostream>using namespace std;int main() {int *a (int *) mal…

集合体系java

Collection:单列集合&#xff1a;每个元素只包含一个值 Collection集合存储的是地址 Collection的三种遍历方法如下 //迭代器是用来遍历集合的专用方式&#xff08;数组没有迭代器&#xff09;&#xff0c;在java中迭代器的代表是Iterator //boolean hasNext():询问当前位置…

排序算法-基数排序

基数排序是一种非比较排序算法&#xff0c;它将待排序的数字按照位数进行排序。基数排序的思想是先按照个位数进行排序&#xff0c;然后按照十位数进行排序&#xff0c;接着按照百位数进行排序&#xff0c;以此类推&#xff0c;直到最高位排序完成。 基数排序的步骤如下&#x…

攻防世界13-simple_php

13-simple_php <?php show_source(*__FILE__*);//高亮文件 include("config.php");//文件包含在内 $a$_GET[a];//获得a $b$_GET[b];//获得b if($a0 and $a){ //判断a是否满足条件echo $flag1; //满足就输出flag1 } if(is_numeric($b)){ //判断b的条件&#x…