Spark SQL生产优化经验--任务参数配置模版

大表扫描

特殊case说明:当任务存在扫event_log表时需注意,若对event_log表进行了过滤,且过滤比很高,如下图的case,input为74T,但shuffle write仅为3.5G,那么建议提高单partition的读取数据量,将参数set spark.sql.files.maxPartitionBytes=536870912提高10倍至5368709120;
在这里插入图片描述

小型任务

目前测试:在不手动添加任何参数、平均时长在30min以内、单个shuffle 量在500G以下的任务可以使用该模版,但实际任务情况还需跟踪观察
开启AQE后,spark.sql.shuffle.partitio参数失效(spark默认200,任务从头到尾的所有shuffle都是这个并行度,无法自由操控)
AQE:实现原理是在每个stage完成后再启动一个子Job来计算shuffle中间结果量,依此来进行调节task个数,让作业根据每次shuffle的数据量自行调节并行度。
spark.sql.adaptive.minNumPostShufflePartitions 控制所有阶段的reduce个数,reduce个数区间最小值,为了防止分区数过小
spark.sql.adaptive.maxNumPostShufflePartitions reduce个数区间最大值,同时也是shuffle分区数的初始值
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 动态调整 reduce 个数的 partition 大小依据。如设置 64MB,则 reduce 阶段每个 task 最少处理 64MB 的数据。默认值为 64MB。

--基础资源
set spark.driver.memory=15g;
set spark.driver.cores=3;
set spark.driver.memoryOverhead=4096;
set spark.executor.memory=5G;
set spark.executor.memoryOverhead=1024;
set spark.executor.cores=2;
set spark.vcore.boost.ratio=2;
--动态executor申请
set spark.dynamicAllocation.minExecutors=10;
set spark.dynamicAllocation.maxExecutors=300;
--ae,shuffle partition并行度
set spark.sql.adaptive.minNumPostShufflePartitions=10;
set spark.sql.adaptive.maxNumPostShufflePartitions=1000;
--268435456;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=536870912;
--开启parquet切分
set spark.sql.parquet.adaptiveFileSplit=true;
--初始task调节,合并小文件
set spark.sql.files.maxPartitionBytes=536870912;

中型任务

目前测试:在不手动添加任何参数、平均时长在90min以内、单个shuffle 量在2T以下的任务可以使用该模版,但实际任务情况还需跟踪观察。
spark.executor.memoryOverhead 每个executor的堆外内存大小,堆外内存主要用于数据IO,对于报堆外OOM的任务要适当调大,单位Mb,与之配合要调大executor JVM参数,例如:set spark.executor.memoryOverhead=3072
set spark.executor.extraJavaOptions=-XX:MaxDirectMemorySize=2560m

--基础资源
set spark.driver.memory=25g;
set spark.driver.cores=4;
set spark.driver.memoryOverhead=5120;
set spark.executor.memory=10G;
set spark.executor.memoryOverhead=4096;
set spark.executor.cores=3;
set spark.vcore.boost.ratio=1;
--动态executor申请
set spark.dynamicAllocation.minExecutors=10;
set spark.dynamicAllocation.maxExecutors=600;
--AQE
set spark.sql.adaptive.minNumPostShufflePartitions=10;
set spark.sql.adaptive.maxNumPostShufflePartitions=1000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize= 536870912;
--开启parquet切分,初始task调节,合并小文件
set spark.sql.parquet.adaptiveFileSplit=true;
set spark.sql.files.maxPartitionBytes=536870912;
--推测
set spark.speculation.multiplier=2.5;
set spark.speculation.quantile=0.8;
--shuffle 落地hdfs
set spark.shuffle.hdfs.enabled=true;
set spark.shuffle.io.maxRetries=1;
set spark.shuffle.io.retryWait=0s;

大型任务

目前测试:在不手动添加任何参数、平均时长在120min以内、单个shuffle 量在10T以下的任务可以使用该模版,但实际任务情况还需跟踪观察。

--基础资源
set spark.driver.memory=25g;
set spark.driver.cores=4;
set spark.driver.memoryOverhead=5120;
set spark.executor.memory=15G;
set spark.executor.memoryOverhead=3072;
set spark.executor.cores=3;
set spark.vcore.boost.ratio=1;
--动态executor申请
set spark.dynamicAllocation.minExecutors=10;
set spark.dynamicAllocation.maxExecutors=900;
--ae
set spark.sql.adaptive.minNumPostShufflePartitions=10;
set spark.sql.adaptive.maxNumPostShufflePartitions=3000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize= 536870912;
--shuffle 落地hdfs
set spark.shuffle.hdfs.enabled=true;
set spark.shuffle.io.maxRetries=1;
set spark.shuffle.io.retryWait=0s;
--开启parquet切分,合并小文件
set spark.sql.parquet.adaptiveFileSplit=true;
set spark.sql.files.maxPartitionBytes=536870912;
--推测
set spark.speculation.multiplier=2.5;
set spark.speculation.quantile=0.9;

超大型任务

目前测试:在不手动添加任何参数、平均时长大于120min、单个shuffle 量在10T以上的任务可以使用该模版,但实际任务情况还需跟踪观察。

--基础资源
set spark.driver.memory=30g;
set spark.driver.cores=4;
set spark.driver.memoryOverhead=5120;
set spark.executor.memory=20G;
set spark.executor.memoryOverhead= 5120;
set spark.executor.cores=5;
set spark.vcore.boost.ratio=1;
--动态executor申请
set spark.dynamicAllocation.minExecutors=10;
set spark.dynamicAllocation.maxExecutors=1500;
--ae
set spark.sql.adaptive.minNumPostShufflePartitions=10;
set spark.sql.adaptive.maxNumPostShufflePartitions=7000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize= 536870912;
--开启parquet切分,合并小文件
set spark.sql.parquet.adaptiveFileSplit=true;
set spark.sql.files.maxPartitionBytes=536870912;
-- shuffle 落地 hdfs,shuffle文件上传hdfs
set spark.shuffle.hdfs.enabled=true;
set spark.shuffle.io.maxRetries=1;
set spark.shuffle.io.retryWait=0s;
--推测
set spark.speculation.multiplier=2.5;
set spark.speculation.quantile=0.9;

其他常用参数

--ae hash join
set spark.sql.adaptive.hashJoin.enabled=true;
set spark.sql.adaptiveHashJoinThreshold=52428800;
--输出文件合并 byBytes,该功能会生成两个stage,
--第一个stage shuffle的数据量来预估最后生成到hdfs上的文件数据量大小,
--并通过预估的文件数据量大小计算第二个stage的并行度,即最后生成的文件个数。
--该功能只能控制生成的文件大小尽量接近spark.merge.files.byBytes.fileBytes,且有一定的性能损耗,需根据实测情况选择使用。    
-- 最终文件数量:(totalBytes / fileBytes / compressionRatio).toInt + 1                       
set spark.merge.files.byBytes.enabled=true;
set spark.merge.files.byBytes.repartitionNumber=100;  --第一个stage的并行读
set spark.merge.files.byBytes.fileBytes=134217728;		-- 预期的文件大小
set spark.merge.files.byBytes.compressionRatio=3;		-- 压缩比,shuffle文件和最后生成的文件格式和压缩格式都不相同,因此通过该参数调节
--输出文件合并  该功能会在原来job的最后一个stage后面增加1个stage来控制最后生成的文件数量,
--对于动态分区,每个分区生成spark.merge.files.number个文件。
spark.merge.files.enabled=true            
spark.merge.files.number=512
--skew_join 解析绕过tqs
set tqs.analysis.skip.hint=true;
--初始task上限
set spark.sql.files.openCostInBytes=4194304;
set spark.datasource.splits.max=20000;
--broadcast时间
set spark.sql.broadcastTimeout = 3600;
--(防止get json报错)
set spark.sql.mergeGetMapValue.enabled=true;--ae 倾斜处理 HandlingSkewedJoin  OptimizeSkewedJoin
set spark.sql.adaptive.allowBroadcastExchange.enabled=true;
set spark.sql.adaptive.hashJoin.enabled=false;
set spark.sql.adaptive.skewedPartitionFactor=3; -- 某partition数据量大于中位数的3倍,判定为倾斜
set spark.sql.adaptive.skewedPartitionMaxSplits=20; -- 限制某一partition最多拆分多少分,spark3已失效
set spark.sql.adaptive.skewedJoin.enabled=true; -- Normal Join Pattern的优化开关
set spark.sql.adaptive.skewedJoinWithAgg.enabled=true; -- JoinWithAgg Pattern的优化开关,非开源版
set spark.sql.adaptive.multipleSkewedJoin.enabled=true;-- MultipleJoin Pattern的优化开关,非开源版
set spark.shuffle.highlyCompressedMapStatusThreshold=20000; -- 分区数大于20000时 使用HighlyCompressedMapStatus统计每个partition数据量,会降低数据统计进度--并发读文件
set spark.sql.concurrentFileScan.enabled=true;
--filter按比例读取文件
set spark.sql.files.tableSizeFactor={table_name}:{filter 比例};
set spark.sql.files.tableSizeFactor=dm_content.tcs_task_dict:10;
--AM failed 时长
set spark.yarn.am.waitTime=200s;
--shuffle service 超时设置
set spark.shuffle.registration.timeout=12000;
set spark.shuffle.registration.maxAttempts=5;
--parquet index 特性生效,in 条件的个数
set spark.sql.parquet.pushdown.inFilterThreshold=30; --设置engine
set tqs.query.engine.type=sparkcli;--hive metastore 超时
spark.hadoop.hive.metastore.client.socket.timeout=600--manta备用
spark.sql.adaptive.maxNumPostShufflePartitions 5000
spark.executor.memoryOverhead 8000
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 536870912

数据倾斜优化 AQE Skewed Join

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/9082.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

接口测试之测试原则、测试用例、测试流程......

一、接口的介绍 软件测试中,常说的接口有两种:图形用户接口(GUI,人与程序的接口)、应用程序编程接口(API)。 接口(API)是系统与系统之间,模块与模块之间或者…

elasticsearch学习篇:初识ES

一、什么是ES 1、基础概念 是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容es是elastic stack(ELK)的核心,负责存储、搜索、分析数据。 ELK包括以下内容: ELK被广泛应用在日志数据…

【Web3】认识元宇宙

元宇宙在Web3中扮演着重要的角色,可以带来许多创新和变革。Web3是下一代互联网的概念,强调去中心化、区块链技术和加密货币的应用。 元宇宙在Web3中的几个作用: 去中心化的虚拟世界:元宇宙通过使用区块链技术和去中心化的网络结构…

【MySQL事务】保证数据完整性的利器

1、事务的认识 事务:事务就是将多个SQL给打包在一起,组成一个整体。组成这个整体的各个SQL,要么全部成功,要么全部失败。 举例说明: 情人节到了,滑稽老铁打算给他女朋友小美发给红包,但是他又害…

QWebEngine应用---执行javascript

我们都知道现代前端技术是基于html、css和javascript进行显示交互的,其中html和css属于静态界面显示,辅以javascript使页面交互更丰富。浏览器作为前端页面显示的基石,提供一套显示、交互、调试的东西。QWebEngine同样也提供了这些功能&#…

spring如何使用junit进行测试

第一步maven的pom.xml引入坐标&#xff1a; <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version></dependency> 第二步编写测试方法&#xff1a; 第三步 定义scope类型

ARM实验-ARM主程序调用ARM/C语言子程序

一、实验名称&#xff1a;ARM主程序调用ARM/C语言子程序 二、实验目的&#xff1a; 了解ARM应用程序框架。了解ARM汇编程序函数和C语言程序函数相互调用时&#xff0c;遵循的ATPCS标准&#xff1b;了解和掌握ARM汇编程序调用C语言程序函数的基本方法&#xff1b;了解和掌握AR…

学习Maven Web 应用

Maven Web 应用 本章节我们将学习如何使用版本控制系统 Maven 来管理一个基于 web 的项目&#xff0c;如何创建、构建、部署已经运行一个 web 应用。 创建 Web 应用 我们可以使用 maven-archetype-webapp 插件来创建一个简单的 Java web 应用。 打开命令控制台&#xff0c;…

LNMT(linux下nignx+mysql+tomcat(中间件)应用)部署应用、及各服务介绍、部署开源站点jpress

目录 一、环境准备 二、tomcat1和tomcat2服务器&#xff0c;安装配置tomcat 1.tomcat服务器介绍 2.JDK软件介绍 3.查看JDK是否安装 4.tomcat1和tomcat2服务器&#xff0c;安装JDK1.8.0_191&#xff08;JDK必须和nginx版本相适应&#xff0c;不然一直报错&#xff09; 5.安…

软件测试之【单元测试、系统测试、集成测试】

一、单元测试的概念 单元测试&#xff08;Unit Testing&#xff09;是对软件基本组成单元进行的测试&#xff0c;如函数&#xff08;function或procedure&#xff09;或一个类的方法&#xff08;method&#xff09;。当然这里的基本单元不仅仅指的是一个函数或者方法&#xff0…

web爬虫第四弹 - 生产者与消费者模型

聊聊天 做了很长一段时间爬虫工作&#xff0c; 一直没时间记录。 去年好不容易静下心来想写点东西&#xff0c; 也是因为各种琐事断掉了&#xff0c; 看了下之前的爬虫笔记。 web爬虫第三弹&#xff0c; postman的使用&#xff1b; 第四弹&#xff1a;代理ip的充分使用&#x…

手把手用项目实战教你从“一般测试”华丽转身成为“专业测试”

一般测试 vs 专业测试 一般测试&#xff1a;就是很多粉丝自己摸索着学习就容易变成这种&#xff1a; 你说他不会吧&#xff1f;他好像都知道&#xff0c;你说他都会吧&#xff0c;又会工作中各种问题&#xff0c;面试问啥啥说不出来&#xff01; 我们把这种情况的人光荣的就叫…