【Spark精讲】SparkSQL的RBO与CBO

Spark SQL核心:Catalyst

        Spark SQL的核心是Catalyst查询编译器,它将用户程序中的SQL/Dataset/DataFrame经过一系列操作,最终转化为Spark系统中执行的RDD。

Catalyst组成部分

  • Parser :用Antlr将SQL/Dataset/DataFrame转化成一棵未经解析的树,生成 Unresolved Logical Plan
  • Analyzer:Analyzer 结合 Catalog 信息对Parser中生成的树进行解析,生成 Resolved Logical Plan
  • Optimizer:对解析完的逻辑计划进行树结构的优化,以获得更高的执行效率,生成 Optimized Logical Plan
    • 谓词下推(Predicate Pushdown):PushdownPredicate 是最常见的用于减少参与计算的数据量的方法,将过滤操作下推到join之前进行
    • 常量合并(Constant Folding):比如, x+(1+2)  -> x+3
    • 列值裁剪(Column Pruning):对列进行裁减,只留下需要的列
  • Planner:Planner将Optimized Logical Plan 转换成多个 Physical Plan
  • CostModel:CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan
  • Spark 以 DAG 的方法执行上述 Physical Plan,在执行 DAG 的过程中,Adaptive Execution 根据运行时信息动态调整执行计划从而提高执行效率

 SQL优化器:RBO、CBO

        SQL语句转化为具体执行计划是由SQL查询编译器决定的,同一个SQL语句可以转化成多种物理执行计划,如何指导编译器选择效率最高的执行计划,这就是优化器的主要作用。传统数据库(例如Oracle)的优化器有两种:

  • 基于规则的优化器(Rule-Based Optimization,RBO)
  • 基于代价的优化器(Cost-Based Optimization,CBO)

2.1 RBO(Rule-Based Optimization)

        RBO: Rule-Based Optimization也即“基于规则的优化器”,该优化器按照硬编码在数据库中的一系列规则来决定SQL的执行计划。只要按照这个规则去写SQL语句,无论数据表中的内容怎样、数据分布如何,都不会影响到执行计划。

        基于规则优化是一种经验式、启发式地优化思路,更多地依靠前辈总结出来的优化规则,简单易行且能够覆盖到大部分优化逻辑,但是对于核心优化算子Join却显得有点力不从心。举个简单的例子,两个表执行Join到底应该使用BroadcastHashJoin  还是SortMergeJoin?当前SparkSQL的方式是通过手工设定参数来确定,如果一个表的数据量小于这个值就使用BroadcastHashJoin,但是这种方案显得很不优雅,很不灵活。基于代价优化(CBO)就是为了解决这类问题,它会针对每个Join评估当前两张表使用每种Join策略的代价,根据代价估算确定一种代价最小的方案 。

2.2 CBO(Cost-Based Optimization)

        CBO: Cost-Based Optimization也即“基于代价的优化器”,该优化器通过根据优化规则对关系表达式进行转换,生成多个执行计划,然后CBO会通过根据统计信息(Statistics)和代价模型(Cost Model)计算各种可能“执行计划”的“代价”,即COST,从中选用COST最低的执行方案,作为实际运行方案。CBO依赖数据库对象的统计信息,统计信息的准确与否会影响CBO做出最优的选择。

        CBO 原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。其核心在于评估一个给定的物理执行计划的代价。物理执行计划是一个树状结构,其代价等于每个执行节点的代价总合。

        每个执行节点的代价分为两个部分:

  • 该执行节点对数据集的影响,或者说该节点输出数据集的大小与分布
  • 该执行节点操作算子的代价

        要计算每个执行节点的代价,CBO需要解决两个问题:

  • 如何获取原始数据集的统计信息
  • 如何根据输入数据集估算特定算子的输出数据集

CBO面临的挑战

​​​​​​在Spark1.0中所有的Catalyst Optimizer都是基于规则 (rule) 优化的。为了产生比较好的查询规 则,优化器需要理解数据的特性,于是在Spark2.0中引入了基于代价的优化器 (cost-based optimizer),也就是所谓的CBO。然而,CBO也无法解决很多问题,比如:

  • 数据统计信息普遍缺失,统计信息的收集代价较高;
  • 储存计算分离的架构使得收集到的统计信息可能不再准确;
  • Spark部署在某一单一的硬件架构上,cost很难被估计;
  • Spark的UDF(User-defined Function)简单易用,种类繁多,但是对于CBO来说是个黑盒子,无法估计其cost;

总而言之,由于种种限制,Spark的优化器无法产生最好的Plan。

也许你会想:Spark为什么不解决这个问题呢?这里有很多挑战,比如: 

  • 统计信息的缺失,统计信息的不准确,那么就是默认依据文件大小来预估表的大小,但是文件 往往是压缩的,尤其是列存储格式,比如parquet 和 ORC,而Spark是基于行处理,如果数据连续重复,file size可能和真实的行存储的真实大小,差别非常之大。这也是为何提高 autoBroadcastJoinThreshold,即使不是太大也可能会导致out of memory; 
  • Filter复杂、UDFs的使用都会使Spark无法准确估计Join输入数据量的大小。当你的queryplan异常大和复杂的时候,这点尤其明显;
  • 其中,Spark3.0中基于运行期的统计信息,将Sort Merge Join 转换为Broadcast Hash Join。

基于RBO优化

left join case

 var appSql: String ="""|select|   *|from|   tab_spark_test as t1|left join tab_spark_test_2 as t2|on t1.id = t2.id|and t1.id > 5+5""".stripMarginsparkSession.sql("use default;")sparkSession.sql(appSql).explain(mode = "extended")

执行计划 

Outer 类型 Join 中的谓词下推

Outer 类型的 Join 操作在实际业务中的应用非常广泛 。 然而,不同于常规的 Join, Outer 类型 Join操作的谓词下推的处理比较复杂,用户在写 SQL语句时非常容易忽略,使得执行结果与自己的本意不符。 下面详细介绍谓词下推的几种处理逻辑。

对于 OuterJoin,假设返回所有行的基表为 Preserved row table,另外一张表为 Null supplying table,例如 t1 left join t2,则 t1 为 Preserved row table, t2 为 Null supplying table。 如果 Join 条件表达式为“on t1.key = t2.key and t1.key > 1 where t2.key >2”,则“t1.key> 1”叫作“Join 中条件”,“t2.key>2”叫作“Join后条件”。 总结起来, Outer Join语句的谓词下推有 4种情况,如下表所示。

为了方便分析,构造如下数据,假设表 t1 和表 t2 中的数据相同,都只包含两条数据。下面以数据表 t1 和 t2 为例,说明这 4种情况。

不加任何过滤条件

select t1. key, t1.value, t2.value
from t1 left join t2 
on tl.key = t2.keys;
t1.keyt1.valuet2.value
111
222

(1) Preserved row table“Join 中条件”不下推

select t1. key, t1.value, t2.value
from t1 left join t2 
on t1.key = t2.key 
and t1.key > 1;

这种情况下,过滤条件不会下推, SQL 最终执行的结果为:

 

(2) Preserved row table “Join 后条件”下推

select t1.key, t1.value, t2.value
from t1 left join t2 
on t1.key = t2.key 
where t1.key > 1;

等价于

selectt1.key,t1.value,t2.value
from (select key, value from t1 where t1.key >1
) t3
left join t2 
on t3.key = t2.key;

  

(3) Null supplying table “Join 中条件”下推

select t1.key, t1.value, t2.value
From t1 left join t2 
on t1.key = t2.key 
and t2.key > 1;

等价于

select t1.key, t1.value, t2.value
from t1 left join 
(
select key, value 
from t2 
where t2.key > 1
) t3 
on t1.key = t3.key;

 

(4) Null supplying table “Join 后条件”不下推

select t1.key, t1.value, t2.value
from t1 left join t2 
on t1.key = t2.key 
where t2.key >1;

基于CBO优化

CBO 优化主要在物理计划层面,原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划。

而每个执行节点的代价,分为两个部分: 

1、该执行节点对数据集的影响,即该节点输出数据集的大小与分布;

2、该执行节点操作算子的代价。

每个操作算子的代价相对固定,可用规则来描述。而执行节点输出数据集的大小与分布,分为两个部分:

1、初始数据集,也即原始表,其数据集的大小与分布可直接通过统计得到;

2、中间节点输出数据集的大小与分布可由其输入数据集的信息与操作本身的特点推算。

需要先执行特定的 SQL 语句来收集所需的表和列的统计信息。 

--表级别统计信息
ANALYZE TABLE 表名 COMPUTE STATISTICS
--生成列级别统计信息
ANALYZE TABLE 表名 COMPUTE STATISTICS FOR COLUMNS 列 1,列 2,列 3--显示统计信息
DESC FORMATTED 表名
--显示列统计信息
DESC FORMATTED 表名 列名s

没有执行 ANALYZE状态 

执行 ANALYZE后,发现多了很多spark.sql.statistics信息

 

CBO相关参数

通过 "spark.sql.cbo.enabled" 来开启,默认是 false。配置开启 CBO 后,CBO 优化器可以基于表和列的统计信息,进行一系列的估算,最终选择出最优的查询计划。比如:Build 侧选择、优化 Join 类型、优化多表 Join 顺序等。

  • spark.sql.cbo.enabled

    默认false。true 表示打开,false 表示关闭。
    要使用该功能,需确保相关表和列的统计信息已经生成。

  • spark.sql.cbo.joinReorder.enabled
    使用 CBO 来自动调整连续的 inner join 的顺序。
    默认false。true:表示打开,false:表示关闭
    要使用该功能,需确保相关表和列的统计信息已经生成,且CBO 总开关打开。
  • spark.sql.cbo.joinReorder.dp.threshold
    使用 CBO 来自动调整连续 inner join 的表的个数阈值。
    默认10。
    如果超出该阈值,则不会调整 join 顺序。
  val CBO_ENABLED =buildConf("spark.sql.cbo.enabled").doc("Enables CBO for estimation of plan statistics when set true.").version("2.2.0").booleanConf.createWithDefault(false)val PLAN_STATS_ENABLED =buildConf("spark.sql.cbo.planStats.enabled").doc("When true, the logical plan will fetch row counts and column statistics from catalog.").version("3.0.0").booleanConf.createWithDefault(false)val JOIN_REORDER_ENABLED =buildConf("spark.sql.cbo.joinReorder.enabled").doc("Enables join reorder in CBO.").version("2.2.0").booleanConf.createWithDefault(false)val JOIN_REORDER_DP_THRESHOLD =buildConf("spark.sql.cbo.joinReorder.dp.threshold").doc("The maximum number of joined nodes allowed in the dynamic programming algorithm.").version("2.2.0").intConf.checkValue(number => number > 0, "The maximum number must be a positive integer.").createWithDefault(12)val JOIN_REORDER_CARD_WEIGHT =buildConf("spark.sql.cbo.joinReorder.card.weight").internal().doc("The weight of cardinality (number of rows) for plan cost comparison in join reorder: " +"rows * weight + size * (1 - weight).").version("2.2.0").doubleConf.checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].").createWithDefault(0.7)val JOIN_REORDER_DP_STAR_FILTER =buildConf("spark.sql.cbo.joinReorder.dp.star.filter").doc("Applies star-join filter heuristics to cost based join enumeration.").version("2.2.0").booleanConf.createWithDefault(false)val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection").doc("When true, it enables join reordering based on star schema detection. ").version("2.2.0").booleanConf.createWithDefault(false)val STARSCHEMA_FACT_TABLE_RATIO = buildConf("spark.sql.cbo.starJoinFTRatio").internal().doc("Specifies the upper limit of the ratio between the largest fact tables" +" for a star join to be considered. ").version("2.2.0").doubleConf.createWithDefault(0.9)

 使用举例

 def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName("CBO").set("spark.sql.cbo.enabled", "true").set("spark.sql.cbo.joinReorder.enabled", "true").setMaster("local[*]")val sparkSession: SparkSession = Util.SparkSession2hive(sparkConf)var appSql: String ="""|select|   t1.name,count(1)|from|   tab_spark_test as t1|left join tab_spark_test_2 as t2|on t1.id = t2.id|group by t1.name""".stripMarginsparkSession.sql("use default;")sparkSession.sql(appSql).show()while (true) {}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/315506.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PowerBI:如何在以SharePoint文件做为数据源?

问题描述&#xff1a; 有朋友最近询问&#xff0c;在PowerBI中如何以SharePoint中的文件做为数据源&#xff0c;进行报告的设计开发&#xff1f; 今天抽一些时间&#xff0c;为大家做一个样例&#xff0c;供大家参考。 解决方案&#xff1a; 找到将要使用的SharePoint中文件…

计算字符串的长度几种方法 | 递归 | 指针减指针 | 计数器 | C语言 | 详解 | 期末考试必看!!!

一&#xff0c;使用 递归 计算 字符串 的 长度 1&#xff0c;题目描述 2&#xff0c;分析题目 Ⅰ&#xff0c;题目中要求除了函数的形参&#xff0c;函数中不能够使用多余的变量&#xff08;这是比较苛刻的要求&#xff09;。 Ⅱ&#xff0c;根据此&#xff0c;很自然的…

红光护理加按摩放松,试试用清洁仪改善头皮健康

头皮健康和脱发问题现在越来越常见了&#xff0c;所以平时注意改善头皮健康特别重要。这些天我正在用一款名为DOCO红光头皮清洁仪&#xff0c;据说它可以通过多种方式改善头皮健康&#xff0c;我感觉效果还可以。 这款红光头皮清洁仪具有高频振动清洁功能&#xff0c;每分钟可以…

MATLAB习题操作实战

2.1创建一个有7个元素的一维数组&#xff0c;并做如下处理:直接寻访一维数组的第6个元素;寻访一维数组的第1、3、5个元素;寻访一维数组中第4个至最后1个元素;寻访一维数组中大于70的元素。 % 创建一维数组 array [50, 60, 70, 80, 90, 100, 110];% 直接寻访一维数组的第6个元…

55寸oled透明显示屏售价,受哪些因素影响

55寸OLED透明显示屏的售价受到多个因素的影响&#xff0c;包括以下几个方面&#xff1a; 尺寸和分辨率&#xff1a;OLED透明显示屏的尺寸和分辨率是决定价格的重要因素。较大的尺寸和较高的分辨率会增加制造成本和售价。 技术水平和制造工艺&#xff1a;OLED透明显示屏的技术水…

GRU算法

前置知识&#xff1a;RNN&#xff0c;LSTM LSTM需要训练的参数很多&#xff0c;极消耗计算资源。GRU是一种LSTM的改进算法&#xff0c;参数更少&#xff0c;更容易训练。 它将忘记门和输入门合并成为一个单一的更新门&#xff0c;同时合并了数据单元状态和隐藏状态&#xff0…

WPF容器的背景对鼠标事件的影响

背景&#xff1a;在实现鼠标拖动窗口的过程中发现对父容器设置了鼠标拖动窗口的事件MouseLeftButtonDown private void DragWindow(object sender, MouseButtonEventArgs e) {if (e.LeftButton MouseButtonState.Pressed)DragMove(); } 问题&#xff1a;非常困惑的是&#x…

开发辅助三(缓存Redisson分布式锁+分页插件)

缓存 缓存穿透&#xff1a;查询一个不存在的数据&#xff0c;由于缓存不命中&#xff0c;将大量查询数据库&#xff0c;但是数据库也没有此记录。 没有将这次查询的null写入缓存&#xff0c;导致了这个不存在的数据每次请求都要到存储层查询&#xff0c;失去了缓存的意义。 解…

机器学习(二) -- 数据预处理(1)

系列文章目录 机器学习&#xff08;一&#xff09; -- 概述 机器学习&#xff08;二&#xff09; -- 数据预处理&#xff08;1-3&#xff09; 机器学习&#xff08;三&#xff09; -- 特征工程&#xff08;1-2&#xff09; 未完待续…… 目录 系列文章目录 前言 一、概述…

普中STM32-PZ6806L开发板(HAL库函数实现-PWM呼吸灯)

简介 实现PWM呼吸灯。 主芯片 STM32F103ZET6呼吸灯引脚 : PC7电路原理图 LED8 电路图 LED8 与 主芯片连接图 其他知识 公式 PWM周期公式: Tpwm ( (ARR 1) * (PSC 1) ) / Tclk Tclk为定时器的输入时钟频率 Tout则为定时器溢出时间 ARR为计数周期 PSC为预分频器的值…

数据结构OJ实验14-哈希查找

A. DS哈希查找—线性探测再散列 题目描述 定义哈希函数为H(key) key%11&#xff0c;输入表长&#xff08;大于、等于11&#xff09;。输入关键字集合&#xff0c;用线性探测再散列构建哈希表&#xff0c;并查找给定关键字。 输入 测试次数t 每组测试数据为&#xff1a; …

Python序列之字典

系列文章目录 Python序列之列表Python序列之元组Python序列之字典&#xff08;本篇文章&#xff09;Python序列之集合 Python序列之字典 系列文章目录前言一、字典是什么&#xff1f;二、字典的操作1.创建&#xff08;1&#xff09;通过{}、dict()创建&#xff08;2&#xff0…