在工作中,我们都会遵循一定的开发规范,当然也包括了sql优化。
1、列裁剪和分区裁剪
所谓列裁剪就是在查询时只读取需要的列,分区裁剪就是只读取需要的分区。以我们的日历记录表为例 当列很多或者数据量很大时,如果select *或者不指定分区,全列扫描和全表扫描效率都很低。
Hive中与列裁剪优化相关的配置项是hive.optimize.cp,与分区裁剪优化相关的则是hive.optimize.pruner, 默认都是true。在HiveQL解析阶段对应的则是ColumnPruner逻辑优化器。
2、谓词下推
将SQL语句中的where谓词逻辑都尽可能提前执行,减少下游处理的数据量。
Join关联的谓词下推(重要知识点):
Left join时,右表的过滤条件必须写到子查询中 或关联条件上,否则会导致先关联再过滤,大大降低效率;
Right join时,与左表同理;
Inner join时,条件无论写在哪里都会被优化器 优化先过滤再关联。
例如以下HiveQL语句:
对forum_topic做过滤的where语句写在子查询内部,而不是外部。Hive中有谓词下推优化的配置项 hive.optimize.ppd,默认值true,与它对应的逻辑优化器是PredicatePushDown。
3、group by代替distinct
当要统计某一列的去重数时,如果数据量很大,count(distinct)就会非常慢,原因是这样会导致所 有map端数据都进入一个reducer中,在数据量大时可能会长时间计算不完,count(distinct)逻辑只会有 很少的reducer来处理。这时可以用group by来改写:
但是这样写会启动两个MR job(单纯distinct只会启动一个),所以要确保数据量大到启动job的 overhead远小于计算耗时,才考虑这种方法。当数据集很小或者key的倾斜比较明显时,group by还 可能会比distinct慢。
4、group by配置调整
map端预聚合
group by时,如果先起一个combiner在map端做部分预聚合,可以有效减少shuffle数据量。预聚合的配置项是hive.map.aggr,默认值true,对应的优化器为GroupByOptimizer,简单方便。
通过hive.groupby.mapaggr.checkinterval参数也可以设置map端预聚合的行数阈值,超过该值就会分拆job,默认值100000。
倾斜均衡配置项
group by时如果某些key对应的数据量过大,就会发生数据倾斜。Hive自带了一个均衡数据倾斜的配置项hive.groupby.skewindata,默认值false。
其实现方法是在group by时启动两个MR job。第一个job会将map端数据随机输入reducer,每个
reducer做部分聚合,相同的key就会分布在不同的reducer中。第二个job再将前面预处理过的数据按key聚合并输出结果,这样就起到了均衡的效果。
但是,配置项毕竟是死的,单纯靠它有时不能根本上解决问题,因此还是建议自行了解数据倾斜的细节,并优化查询语句。
5、join 优化
build table(小表)前置
在最常见的hash join方法中,一般总有一张相对小的表和一张相对大的表,小表叫build table,大表叫probe table。如下图所示。
Hive在解析带join的SQL语句时,会默认将最后一个表作为probe table,将前面的表作为build table并试图将它们读进内存。如果表顺序写反,probe table在前面,引发OOM的风险就高了。
map join特性
map join特别适合大小表join的情况。Hive会将build table和probe table在map端直接完成join过程,消灭了reduce,效率很高。
上面的语句中加了一条map join hint,以显式启用map join特性。早在Hive 0.8版本之后,就不需要写这条hint了。map join还支持不等值连接,应用更加灵活。
map join的配置项是hive.auto.convert.join,默认值true,对应逻辑优化器是MapJoinProcessor。
还有一些参数用来控制map join的行为,比如hive.mapjoin.smalltable.filesize,当build table大小小于该值就会启用map join,默认值25000000(25MB)。还有hive.mapjoin.cache.numrows,表示缓存build table的多少行数据到内存,默认值25000。
倾斜均衡配置项
这个配置与上面group by的倾斜均衡配置项异曲同工,通过hive.optimize.skewjoin来配置,默认false。 如果开启了,在join过程中Hive会将计数超过阈值hive.skewjoin.key(默认100000)的倾斜key对应的行临时写进文件中,然后再启动另一个job做map join生成结果。通过hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个job的mapper数量,默认10000。
通过自带的配置项经常不能解决数据倾斜问题。join是数据倾斜的重灾区,后面还要介绍在SQL层
面处理倾斜的各种方法。
空值或无意义值
这种情况很常见,比如当事实表是日志类数据时,往往会有一些项没有记录到,我们视情况会将它置为null,或者空字符串、-1等。如果缺失的项很多,在做join时这些空值就会非常集中,拖累进度。因此,若不需要空值数据,就提前写where语句过滤掉。需要保留的话,将空值key用随机方式打散,例如将用户ID为null的记录随机改为负值。
单独处理倾斜key
这其实是上面处理空值方法的拓展,不过倾斜的key变成了有意义的。一般来讲倾斜的key都很少,我们可以将它们抽样出来,对应的行单独存入临时表中,然后打上一个较小的随机数前缀(比如0~9),最后再进行聚合。
build table过大
build table会大到无法直接使用map join的地步,比如全量用户维度表,而使用普通join又有数据分布不均的问题。这时就要充分利用probe table的限制条件,削减build table的数据量,再使用map join解决。代价就是需要进行两次join。如下:
6、MapReduce优化
调整mapper数
mapper数量与输入文件的split数息息相关,在Hadoop源码org.apache.hadoop.mapreduce.lib.input.FileInputFormat 类中可以看到split划分的具体逻辑。这里不贴代码,直接叙述mapper数是如何确定的。
可以直接通过参数mapred.map.tasks(默认值2)来设定mapper数的期望值,但它不一定会生效。设输入文件的总大小为total_input_size。HDFS中,一个块的大小由参数dfs.block.size指定,默认值64MB或128MB。在默认情况下,mapper数就是:
default_mapper_num = total_input_size / dfs.block.size
参数mapred.min.split.size(默认值1B)和mapred.max.split.size(默认值64MB)分别用来指定split的最小和最大大小。split大小和split数计算规则是:
split_size = MAX(mapred.min.split.size, MIN(mapred.max.split.size, dfs.block.size))
split_num = total_input_size / split_size
得出mapper数:
mapper_num = MIN(split_num, MAX(default_num, mapred.map.tasks))
可见,如果想减少mapper数,就适当调高mapred.min.split.size,split数就减少了。如果想增大mapper数,除了降低mapred.min.split.size之外,也可以调高mapred.map.tasks。
一般来讲,如果输入文件是少量大文件,就减少mapper数;如果输入文件是大量非小文件,就增大mapper数。
调整reducer数
reducer数量的确定方法比mapper简单得多。使用参数mapred.reduce.tasks可以直接设定reducer数量,不像mapper一样是期望值。但如果不设这个参数的话,Hive就会自行推测,逻辑如下:
参数hive.exec.reducers.bytes.per.reducer用来设定每个reducer能够处理的最大数据量,256M(1.2版本之后)。
参数hive.exec.reducers.max用来设定每个job的最大reducer数量,1009(1.2版本之后)。
得出reducer数:reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)
reducer数量与输出文件的数量相关。如果reducer数太多,会产生大量小文件,对HDFS造成压力。如果reducer数太少,每个reducer要处理很多数据,容易拖慢运行时间或者造成OOM。
合并小文件
输入阶段合并
需要更改Hive的输入文件格式,即参数hive.input.format,默认值是
org.apache.hadoop.hive.ql.io.HiveInputFormat,我们改成
org.apache.hadoop.hive.ql.io.CombineHiveInputFormat。
这样比起上面调整mapper数时,又会多出两个参数,分别是mapred.min.split.size.per.node和
mapred.min.split.size.per.rack,含义是单节点和单机架上的最小split大小。如果发现有split大小小于这两个值则会进行合并。具体逻辑可以参看Hive源码中的对应类。
输出阶段合并
直接将hive.merge.mapfiles和hive.merge.mapredfiles都设为true即可,前者表示将map-only任务的输出合并,后者表示将map-reduce任务的输出合并。
另外,hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值,
hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值。如果平均大小不足的话,就会另外启动一个任务来进行合并。
启用压缩
压缩job的中间结果数据和输出数据,可以用少量CPU时间节省很多空间。压缩方式一般选择Snappy,效率最高。
要启用中间压缩,需要设定hive.exec.compress.intermediate为true,同时指定压缩方式
hive.intermediate.compression.codec为org.apache.hadoop.io.compress.SnappyCodec。
另外,参数
hive.intermediate.compression.type可以选择对块(BLOCK)还是记录(RECORD)压缩,BLOCK的压缩率比较高。
输出压缩的配置基本相同,打开hive.exec.compress.output即可。
========================================================
这个章节的内容分享,还是有难度的,不一定所有的小伙伴都可以看得懂,但大家可以慢慢来,至少知道有这么一个概念,下次遇到就有印象了 。
你看过的每一篇文章,和点过的每一个赞,都对未来是有意义的。