SparkSQL优化

SparkSQL优化

优化说明

缓存数据到内存

Spark SQL可以通过调用spark.sqlContext.cacheTable("tableName") 或者dataFrame.cache(),将表用一种柱状格式( an in­memory columnar format)缓存至内存中。然后Spark SQL在执行查询任务时,只需扫描必需的列,从而以减少扫描数据量、提高性能。通过缓存数据,Spark SQL还可以自动调节压缩,从而达到最小化内存使用率和降低GC压力的目的。调用sqlContext.uncacheTable("tableName")可将缓存的数据移出内存。

通过sc.broadcast(spark.table("表名")),将表广播出去,进行表与表之间的join相关操作。

可通过两种配置方式开启缓存数据功能:

1)使用spark.sqlContext的setConf方法。

2)执行SQL命令 SET key=value。

表-2 优化方式

Property Name

Default

Meaning

spark.sql.inMemoryColumnarStorage.compressed

true

如果假如设置为true,SparkSql会根据统计信息自动的为每个列选择压缩方式进行压缩

spark.sql.inMemoryColumnarStorage.batchSize

10000

控制列缓存的批量大小。批次大有助于改善内存使用和压缩,但是缓存数据会有OOM的风险

参数调优

可以通过配置下表中的参数调节Spark SQL的性能。

表-3 参数调优

Property Name

Default

Meaning

spark.sql.files.maxPartitionBytes

134217728 (128 MB)

获取数据到分区中的最大字节数。

spark.sql.files.openCostInBytes

4194304 (4 MB)

该参数默认4M,表示小于4M的小文件会合并到一个分区中,用于减小小文件,防止太多单个小文件占一个分区情况。

spark.sql.broadcastTimeout

300

广播等待超时时间,单位秒。

spark.sql.autoBroadcastJoinThreshold

10485760 (10 MB)

最大广播表的大小。设置为-1可以禁止该功能。当前统计信息仅支持Hive Metastore表。

spark.sql.shuffle.partitions

200

设置shuffle分区数,默认200。

SQL炸裂函数

Explode:SparkSql中的列转行函数:专门针对array或map操作。

//使用explode方法必须导入下面的包:
import org.apache.spark.sql.functions._

object explode_Demo{def main(args: Array[String]): Unit = {//创建程序入口val spark: SparkSession = SparkSession.builder().appName("createDF").master("local[*]").getOrCreate()//调用sparkContextval sc: SparkContext = spark.sparkContext//设置控制台日志输出级别sc.setLogLevel("WARN")//导包import spark.implicits._//加载数据val positionDF = spark.read.json("E:\\资料\\position.json")//查看表结构positionDF.printSchema()//DSL方法处理val listData: DataFrame = positionDF.select(explode($"data.list")).toDF("position")//查看表结构listData.printSchema()//查看表数据listData.show(false)//查看workName并统计个数listData.select($"position.workName" as "positions").groupBy($"positions").count().orderBy($"count".desc).show()}
}//SQL风格操作/*positionDF.createOrReplaceTempView("t_position")val sql ="""|select position.workName as workNames,count(*) as counts|from(|select explode(data.list) as position|from t_position)|group by workNames|order by counts desc""".stripMarginspark.sql(sql).show()*/

SparkSQL运行架构

Spark SQL对SQL语句的处理和关系型数据库类似,即词法/语法解析、绑定、优化、执行。Spark SQL会先将SQL语句解析成一棵树,然后使用规则(Rule)对Tree进行绑定、优化等处理过程。Spark SQL由Core、Catalyst、Hive、Hive-ThriftServer四部分构成:

1)Core: 负责处理数据的输入和输出,如获取数据,查询结果输出成DataFrame等。

2)Catalyst: 负责处理整个查询过程,包括解析、绑定、优化等。

3)Hive: 负责对Hive数据进行处理。

4)Hive-ThriftServer: 主要用于对hive的访问。

DataFrame性能上比RDD要高,主要有两方面原因:

1)定制化内存管理:Rdd数据都放在堆内存,JAVA(JVM)内存,内存管理回收分配不是由spark管理,是由JAVA(GC)管理,有时候会出现资源不一致问题,spark不是直接的内存管理者。

2)DataFrame数据以二进制的方式存在于非堆内存,节省了大量空间之外,还摆脱了GC的限制。涉及到序列化和反序列化,如图-13。

图-13 GC占比关系图

优化的执行计划

查询计划通过Spark catalyst optimiser进行优化,例子如图-14。

图-14 案例图

SparkSQL针对案例优化如图-15所示:

图-15 优化流程

为了说明查询优化,我们来看图-15展示的人口数据分析的示例。图中构造了两个DataFrame,将它们join之后又做了一次filter操作。如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。

得到的优化执行计划在转换成物理执行计划的过程中,还可以根据具体的数据源的特性将过滤条件下推至数据源内。最右侧的物理执行计划中Filter之所以消失不见,就是因为溶入了用于执行最终的读取操作的表扫描节点内。

对于普通开发者而言,查询优化器的意义在于,即便是经验并不丰富的程序员写出的次优的查询,也可以被尽量转换为高效的形式予以执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/675585.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JAVA中的线程、死锁、异常

线程 Thread 一、程序 1.一段静态代码(静态) 二、进程 1.动态的,有开始,有结束;2.程序的一次执行过程,3.操作系统调度分配资源的最小单位; 三、…

TCN-BiGRU-Attention(12种算法优化TCN-BiGRU-Attention)(多输入单输出)

12种算法优化TCN-BiGRU-Attention模型预测的代码。其中Attention模型可以改为单头或者多头,在代码中就是改个数字而已。代码注释已写好如何更改。 TCN-BiGRU-Attention(12种算法优化TCN-BiGRU-Attention)(多输入单输出)代码获取戳…

【C++】C++11--- 类的新功能

目录 类的新功能 默认成员函数 示例 类成员变量初始化 强制生成默认函数的关键字default 禁止生成默认函数的关键字delete 类的新功能 默认成员函数 构造函数析构函数拷贝构造函数拷贝赋值重载取地址重载const取地址重载 C11在原先的6个默认成员函数的基础上&#xff0c…

WebDriver使用带用户名密码验证的IP代理解决方案

背景,使用python3 selenium 先定义一个方法,这里主要用到了chrome插件的功能,利用这个插件来放进代理内容。 def create_proxy_auth_extension(proxy_host, proxy_port,proxy_username, proxy_password, schemehttp):manifest_json "…

【Java基础】Maven继承

1. 前言 Maven 在设计时,借鉴了 Java 面向对象中的继承思想,提出了 POM 继承思想。 2. Maven继承 当一个项目包含多个模块时,可以在该项目中再创建一个父模块,并在其 POM 中声明依赖,其他模块的 POM 可通过继承父模…

基于TF的简易关键字语音识别

⚠申明: 未经许可,禁止以任何形式转载,若要引用,请标注链接地址。 全文共计10182字,阅读大概需要10分钟 🌈更多学习内容, 欢迎👏关注👀【文末】我的个人微信公众号&#…

word文件名和创建时间可以同时提取出来吗?答案是肯定的!方法很简单 一键就搞定

在日常生活和工作中,我们经常需要处理大量的Word文件,有时候需要提取这些文件的文件名以及它们的创建时间。虽然这听起来可能是一个复杂的任务,但实际上,通过一些简单的方法和工具,我们可以轻松地完成这一任务。在本文…

「C++ 内存管理篇 04」动态二维数组

目录 一. 使用calloc/free开辟和释放二维数组 二、 使用new/delete开辟和释放二维数组 一. 使用calloc/free开辟和释放二维数组 让一个二级指针变量存放动态开辟的一级指针数组的起始地址,然后让这些一级指针指向动态开辟的基本类型的数组: // 开辟一个大…

机器学习项目实践-基础知识部分

环境建立 我们做项目第一步就是单独创建一个python环境,Python新的隔离环境 创建:python -m venv ml 使用:.\Scripts\activate python -m venv ml 是在创建一个名为 ml 的虚拟环境,这样系统会自动创建一个文件夹ml,…

YOLOv5改进(一)MobileNetv3替换主干网络

前言 本篇博客主要讲解YOLOv5主干网络的替换,使用MobileNetv3实现模型轻量化,平衡速度和精度。以下为改进的具体流程~ 目录 一、改进MobileNetV3_Small 第一步:修改common.py,新增MobileNetV3 第二步:在yolo.py的parse_model函…

算法提高之玉米田

算法提高之玉米田 核心思想&#xff1a;状态压缩dp 将图存入g数组 存的时候01交换一下方便后面判断即g数组中0为可以放的地方 state中1为放的地方 这样只要state为1 g为0就可以判断不合法 #include <iostream>#include <cstring>#include <algorithm>#includ…

linux 内核编译

目录 Linux操作系统框架 Linux内核的主要功能&#xff1a; Linux的内核目录结构&#xff1a; 结构图: 详细介绍&#xff1a; uname - a 补充 编译之前 UTC 时间补充 Linux内核编译流程: 方法一: 官方内核编译: 1. 运行 build.sh 脚本&#xff0c; 记得加 sudo 权…