目录
一、前言
二、hive执行计划
2.1 hive explain简介
2.1.1 语法格式
2.1.2 查询计划阶段说明
2.2 操作演示
2.2.1 不加条件的查询计划分析
2.2.2 带条件的查询计划分析
三、MapReduce属性优化
3.1 本地模式
3.1.1 本地模式参数设置
3.1.2 本地模式操作演示
3.2 JVM重用
3.2.1 什么是JVM重用
3.3 并行执行
四、join优化
4.1 hive sql的join执行简介
4.2 Map Join
4.2.1 执行原理
4.2.2 使用方式
4.3 Reduce Join
4.3.1 使用场景
4.3.2 执行原理
4.3.3 使用方式
4.4 Bucket Join
4.4.1 使用场景
4.4.2 执行原理
4.4.3 使用方式1
4.4.4 使用方式2
五、优化器
5.1 关联优化
5.2 优化器引擎
5.2.1 背景
5.2.2 优化器引擎 —— RBO
5.2.3 优化器引擎 —— CBO
5.3 Analyze分析器
5.3.1 Analyze 功能
5.3.1 Analyze 语法
5.3.2 实际操作
六、谓词下推
6.1 谓词下推概述
6.1.1 谓词下推案例总结
6.2 谓词下推常用规则总结
七、数据倾斜
7.1 概述
7.2 数据倾斜场景一
7.2.1 方案一
7.2.2 方案二
7.2.3 方案三
7.3 数据倾斜场景二
7.3.1 方案一
7.3.2 方案二
7.3.3 方案三
一、前言
上一篇,我们分享了hive表数据常用的优化策略,本篇再从hive的job执行层面来聊聊可以优化的常用的一些手段。
二、hive执行计划
在正式分享job优化之前,有必要先了解下hive的一条sql执行时经历的事情,即explain执行计划,在学习mysql的时候,DBA或者开发人员经常通过explain关键字来分析一条慢sql的执行计划,从而指导sql优化。
2.1 hive explain简介
HiveQL,是一种类SQL语言,从编程语言规范来说是一种声明式语言,用户会根据查询需求提交声明式的HQL查询,而Hive会根据底层计算引擎将其转化成Mapreduce/Tez/Spark的job;
hive explain 补充说明:
- 使用hive的explain命令可以帮助用户了解一条HQL语句在底层的实现过程,通俗来说就是Hive打算如何去做这件事;
- explain会解析HQL语句,将整个HQL语句的实现步骤、依赖关系、实现过程都会进行解析返回,可以了解一条HQL语句在底层是如何实现数据的查询及处理的过程,辅助用户对Hive进行优化;
- 官网:hive sql官网地址;
2.1.1 语法格式
EXPLAIN [FORMATTED|EXTENDED|DEPENDENCY|AUTHORIZATION|] query
参数说明:
- FORMATTED:对执行计划进行格式化,返回JSON格式的执行计划;
- EXTENDED:提供一些额外的信息,比如文件的路径信息;
- DEPENDENCY:以JSON格式返回查询所依赖的表和分区的列表 ;
- AUTHORIZATION:列出需要被授权的条目,包括输入与输出;
2.1.2 查询计划阶段说明
每个查询计划由以下几个部分组成:
The Abstract Syntax Tree for the query
抽象语法树(AST):Hive使用Antlr解析生成器,可以自动地将HQL生成为抽象语法树
The dependencies between the different stages of the plan
Stage依赖关系:会列出运行查询划分的stage阶段以及之间的依赖关系
The description of each of the stages
Stage内容:包含了每个stage非常重要的信息,比如运行时的operator和sort orders等具体的信息
2.2 操作演示
2.2.1 不加条件的查询计划分析
使用explain关键分析如下查询sql
explain extended select * from tb_login;
执行结果如下,重点关注下图中,TableScan后面的信息
2.2.2 带条件的查询计划分析
执行下面的sql,根据主键字段查询
explain extended select count(*) from tb_emp01 where empno=7369;
使用explain分析执行计划,此时将会看到更复杂的执行语法树;
关于执行计划的结果分析中的各个参数作用,可以结合官网或者其他资料进行深入的学习,这里就不过多展开了;
三、MapReduce属性优化
通过上面的执行计划分析,其实可以知道,hive在大多数情况下,其得到的查询结果底层会走MR的job,所以可以理解为,MR与hive结合相关的属性调优也是整个job优化中非常重要的内容。
3.1 本地模式
在使用Hive过程中,有一些数据量不大的表也会转换为MapReduce处理,提交到集群时,需要申请资源,等待资源分配,启动JVM进程,再运行Task,一系列的过程比较繁琐,本身数据量并不大,提交到YARN运行返回会导致性能较差的问题。
Hive为了解决这个问题,延用了MapReduce中的设计,提供本地计算模式,允许程序不提交给YARN,直接在本地运行,以便于提高小数据量程序的性能。
3.1.1 本地模式参数设置
如果要使用本地模式,只需要开启下面的参数即可,也就是说,当开启这个参数之后,hive在提交一条查询sql之后,会被MR自动识别并解析到,从而走本地模式;
-- 开启本地模式
set hive.exec.mode.local.auto = true;
注意:该参数默认为false,要使用的话需要手动开启,如下即显示了hive要是走本地模式的几个条件,即条件都满足的情况下才会走本地模式;
3.1.2 本地模式操作演示
设置参数
set hive.exec.mode.local.auto = true;
执行查询
select count(*) from tb_emp01;
可以看到,在这种情况下,140多万的数据只需要不到2秒的时间就统计出了结果 ,同时在执行日志中,也看到job的执行中使用了local,即走了本地模式;
3.2 JVM重用
Hadoop默认会为每个Task启动一个JVM来运行,而在JVM启动时内存开销大,Job数据量大的情况,如果单个Task数据量比较小,也会申请JVM,这就导致了资源紧张及浪费的情况;
3.2.1 什么是JVM重用
JVM重用可以使得JVM实例在同一个job中重新使用N次,当一个Task运行结束以后,JVM不会进行释放,而是继续供下一个Task运行,直到运行了N个Task以后,就会释放;
N的值可以在Hadoop的mapred-site.xml文件中进行配置,通常在10-20之间;
参数设置如下
-- Hadoop3之前的配置,在mapred-site.xml中添加以下参数
-- Hadoop3中已不再支持该选项
mapreduce.job.jvm.numtasks=10
3.3 并行执行
Hive在实现HQL计算运行时,会解析为多个Stage,有时候Stage彼此之间有依赖关系,只能挨个执行,但是在一些别的场景下,很多的Stage之间是没有依赖关系的;
例如Union语句,Join语句等等,这些Stage没有依赖关系,但是Hive依旧默认挨个执行每个Stage,这样会导致性能非常差,我们可以通过修改参数,开启并行执行,当多个Stage之间没有依赖关系时,允许多个Stage并行执行,提高性能。
并行执行关键参数设置
-- 开启Stage并行化,默认为false
SET hive.exec.parallel=true;
-- 指定并行化线程数,默认为8
SET hive.exec.parallel.thread.number=16;
四、join优化
join在mysql的日常编写sql过程中可以说是非常常见了,也是数据分析处理过程中必不可少的操作,Hive同样支持Join的语法;
4.1 hive sql的join执行简介
Hive Join的底层是通过MapReduce来实现的,Hive实现Join时,为了提高MapReduce的性能,提供了多种Join方案来实现;
例如:适合小表Join大表的Map Join,大表Join大表的Reduce Join,以及大表Join的优化方案Bucket Join等。
4.2 Map Join
即没有reduce阶段的task的场景,适合于小表join大表或者小表Join小表,如下图执行流程所示,在这种情况下,减少了reduce阶段task带来的执行时的资源开销;
4.2.1 执行原理
将小的那份数据给每个MapTask的内存都放一份完整的数据,大的数据每个部分都可以与小数据的完整数据进行join 底层不需要经过shuffle,需要占用内存空间存放小的数据文件
4.2.2 使用方式
尽量使用Map Join来实现Join过程,Hive中默认自动开启了Map Join:hive.auto.convert.join=true,Hive中小表的大小限制,在不同的版本中主要设置参数如下:
-- 2.0版本之前的控制属性
hive.mapjoin.smalltable.filesize=25M
-- 2.0版本开始由以下参数控制
hive.auto.convert.join.noconditionaltask.size=512000000
4.3 Reduce Join
如果map端的join处理不了的情况下,比如两个join表的数据量都比较大的时候,就要考虑使用Reduce Join;
4.3.1 使用场景
适合于大表Join大表
4.3.2 执行原理
如下图所示,将两张表的数据在shuffle阶段利用shuffle的分组来将数据按照关联字段进行合并 必须经过shuffle,利用Shuffle过程中的分组来实现关联;
4.3.3 使用方式
Hive会自动判断是否满足Map Join,如果不满足Map Join,则自动执行Reduce Join
4.4 Bucket Join
顾名思义,即利用分桶表进行join,从之前的学习中我们了解到,使用分桶表,在进行join操作的时候,可以减少笛卡尔乘积的值,从而达到提升性能的目的;
4.4.1 使用场景
适合于大表Join大表
4.4.2 执行原理
将两张表按照相同的规则将数据划分 根据对应的规则的数据进行join 减少了比较次数,提高了性能,如下图所示;
4.4.3 使用方式1
基于某个字段,语法:clustered by colName,参数设置
set hive.optimize.bucketmapjoin = true;
使用要求:分桶字段 = Join字段 ,桶的个数相等或者成倍数
4.4.4 使用方式2
使用Sort Merge Bucket Join(SMB),基于有序的数据Join,语法:
clustered by colName sorted by (colName)
具体参数设置
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
使用要求
分桶字段 = Join字段 = 排序字段 ,桶的个数相等或者成倍数
五、优化器
5.1 关联优化
当一个程序中如果有一些操作彼此之间有关联性,是可以在一个MapReduce中实现的,但是Hive会不智能的选择,Hive会使用两个MapReduce来完成这两个操作。
例如:当我们执行 select …… from table group by id order by id desc。该SQL语句转换为MapReduce时,我们可以有两种方案来实现:
方案一
- 第一个MapReduce做group by,经过shuffle阶段对id做分组;
- 第二个MapReduce对第一个MapReduce的结果做order by,经过shuffle阶段对id进行排序;
方案二
- 因为都是对id处理,可以使用一个MapReduce的shuffle既可以做分组也可以排序;
在这种场景下,Hive会默认选择用第一种方案来实现,这样会导致性能相对较差,可以在Hive中开启关联优化,对有关联关系的操作进行解析时,可以尽量放在同一个MapReduce中实现,关键配置参数如下:
set hive.optimize.correlation=true;
5.2 优化器引擎
5.2.1 背景
Hive默认的优化器在解析一些聚合统计类的处理时,底层解析的方案有时候不是最佳的方案,例如:当前有一张表【共1000条数据】,id构建了索引,id =100值有900条,我们的需求是:
查询所有id = 100的数据,SQL语句为:select * from table where id = 100;
对于这个需求,可以考虑下面的方案进行实现
方案一
由于id这一列构建了索引,索引默认的优化器引擎RBO,会选择先从索引中查询id = 100的值所在的位置,再根据索引记录位置去读取对应的数据,但是这并不是最佳的执行方案。
方案二
有id=100的值有900条,占了总数据的90%,这时候是没有必要检索索引以后再检索数据的,可以直接检索数据返回,这样的效率会更高,更节省资源,这种方式就是CBO优化器引擎会选择的方案。
5.2.2 优化器引擎 —— RBO
rule basic optimise
基于规则的优化器,根据设定好的规则来对程序进行优化
5.2.3 优化器引擎 —— CBO
cost basic optimise
基于代价的优化器,根据不同场景所需要付出的代价来合适选择优化的方案,对数据的分布的信息【数值出现的次数,条数,分布】来综合判断用哪种处理的方案是最佳方案
Hive中支持RBO与CBO这两种引擎,默认使用的是RBO优化器引擎,很明显CBO引擎更加智能,所以在使用Hive时,我们可以配置底层的优化器引擎为CBO引擎,配置参数如下:
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true; set hive.stats.fetch.column.stats=true;
思考:CBO引擎是基于代价的优化引擎,那么CBO如何知道每种方案的计算代价的呢?接下来就要说说Analyze分析器了。
5.3 Analyze分析器
5.3.1 Analyze 功能
用于提前运行一个MapReduce程序将表或者分区的信息构建一些元数据【表的信息、分区信息、列的信息】,搭配CBO引擎一起使用。
5.3.1 Analyze 语法
构建分区信息元数据
ANALYZE TABLE tablename
[PARTITION(partcol1[=val1], partcol2[=val2], ...)]
COMPUTE STATISTICS [noscan];
构建列的元数据
ANALYZE TABLE tablename
[PARTITION(partcol1[=val1], partcol2[=val2], ...)]
COMPUTE STATISTICS FOR COLUMNS ( columns name1, columns name2...) [noscan];
查看元数据
DESC FORMATTED [tablename] [columnname];
5.3.2 实际操作
比如我们按照下面的操作步骤分析 tb_login_part 这张表
--分析优化器
use tb_part; -- 构建表中分区数据的元数据信息
ANALYZE TABLE tb_login_part PARTITION(logindate) COMPUTE STATISTICS;-- 构建表中列的数据的元数据信息
ANALYZE TABLE tb_login_part COMPUTE STATISTICS FOR COLUMNS userid;-- 查看构建的列的元数据
desc formatted tb_login_part userid;
可以看到如下信息
六、谓词下推
什么是谓词
用来描述或判定客体性质、特征或者客体之间关系的词项。比如"3 大于 2"中"大于"是一个谓词。
6.1 谓词下推概述
谓词下推Predicate Pushdown(PPD)基本思想:将过滤表达式尽可能移动至靠近数据源的位置,以使真正执行时能直接跳过无关的数据。简单点说就是在不影响最终结果的情况下,尽量将过滤条件提前执行。
Hive中谓词下推后,过滤条件会下推到map端,提前执行过滤,减少map到reduce的传输数据,提升整体性能。
开启下面的参数来使用(默认开启)
hive.optimize.ppd=true;
比如有下面的2个执行sql,哪一个更好呢?推荐形式1的写法,先过滤再join;
select a.id,a.value1,b.value2 from table1 a
join (select b.* from table2 b where b.ds>='20181201' and b.ds<'20190101') c
on (a.id=c.id);select a.id,a.value1,b.value2 from table1 a
join table2 b on a.id=b.id
where b.ds>='20181201' and b.ds<'20190101';
6.1.1 谓词下推案例总结
通过下面这些日常经常可能涉及到的sql,可以检验自己在编写sql的时候有没有满足谓词下推的规则;
6.2 谓词下推常用规则总结
下面整理了一些关联sql中关于谓词下推的优化参考规则;
- 对于Join(Inner Join)、Full outer Join,条件写在on后面,还是where后面,性能上面没有区别;
- 对于Left outer Join ,右侧的表写在on后面、左侧的表写在where后面,性能上有提高;
- 对于Right outer Join,左侧的表写在on后面、右侧的表写在where后面,性能上有提高;
- 当条件分散在两个表时,谓词下推可按上述结论2和3自由组合;
七、数据倾斜
7.1 概述
分布式计算中最常见的,最容易遇到的问题就是数据倾斜,数据倾斜的现象是:
当提交运行一个程序时,这个程序的大多数的Task都已经运行结束了,只有某一个Task一直在运行,迟迟不能结束,导致整体的进度卡在99%或者100%,这时候就可以判定程序出现了数据倾斜的问题。
而造成数据倾斜的原因大多是:数据分配的不均匀,比如下面这张图,太多相同的单词被分配到某一个任务上去处理,导致这个任务所在的节点压力非常大,这个任务的执行将会非常慢,这就是发生了数据倾斜;
7.2 数据倾斜场景一
当程序中出现group by或者count(distinct)等分组聚合的场景时,如果数据本身是倾斜的,根据MapReduce的Hash分区规则,肯定会出现数据倾斜的现象。
根本原因是因为分区规则导致的,所以可以通过以下几种方案来解决group by导致的数据倾斜的问题。
7.2.1 方案一
开启Map端聚合,通过减少shuffle数据量和Reducer阶段的执行时间,避免每个Task数据差异过大导致数据倾斜;
hive.map.aggr=true;
7.2.2 方案二
实现随机分区,比如下面这种写法,distribute by用于指定底层按照哪个字段作为Key实现分区、分组等 通过rank函数随机值实现随机分区,避免数据倾斜;
select * from table distribute by rand();
7.2.3 方案三
数据倾斜时自动负载均衡,通过开启如下参数
hive.groupby.skewindata=true;
开启该参数以后,当前程序会自动通过两个MapReduce来运行
- 第一个MapReduce自动进行随机分布到Reducer中,每个Reducer做部分聚合操作,输出结果;
- 第二个MapReduce将上一步聚合的结果再按照业务(group by key)进行处理,保证相同的分布到一起,最终聚合得到结果;
7.3 数据倾斜场景二
Join操作时,如果两张表比较大,无法实现Map Join,只能走Reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题,面对Join产生的数据倾斜,核心的思想是尽量避免Reduce Join的产生,优先使用Map Join来实现;
但往往很多的Join场景不满足Map Join的需求,那么可以以下几种方案来解决Join产生的数据倾斜问题:
7.3.1 方案一
提前过滤,将大数据变成小数据,实现Map Join,如下sql
select a.id,a.value1,b.value2 from table1 a
join (select b.* from table2 b where b.ds>='20181201' and b.ds<'20190101') c
on (a.id=c.id);
7.3.2 方案二
使用Bucket Join
如果使用方案一,过滤后的数据依旧是一张大表,那么最后的Join依旧是一个Reduce Join,这种场景下,可以将两张表的数据构建为桶表,实现Bucket Map Join,避免数据倾斜;
7.3.3 方案三
使用Skew Join,Skew Join是Hive中一种专门为了避免数据倾斜而设计的特殊的Join过程
这种Join的原理是将Map Join和Reduce Join进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用Map Join来实现 ,其他没有产生数据倾斜的数据由Reduce Join来实现,这样就避免了Reduce Join中产生数据倾斜的问题 最终将Map Join的结果和Reduce Join的结果进行Union合并;
Skew Joi实现原理示意图
Skew Join使用需要开启下面的参数配置
-- 开启运行过程中skewjoin
set hive.optimize.skewjoin=true;-- 如果这个key的出现的次数超过这个范围
set hive.skewjoin.key=100000;-- 在编译时判断是否会产生数据倾斜
set hive.optimize.skewjoin.compiletime=true;-- 不合并,提升性能
set hive.optimize.union.remove=true;-- 如果Hive的底层走的是MapReduce,必须开启这个属性,才能实现不合并
set mapreduce.input.fileinputformat.input.dir.recursive=true;