hive04_DQL操作

news/2024/11/19 1:48:35/文章来源:https://www.cnblogs.com/istitches/p/18348540

注意点

全局排序 OrderBy

SELECT <select_expression>, <select_expression>, ...FROM <table_name>ORDER BY <col_name> [ASC|DESC] [,col_name [ASC|DESC], ...]
  • Hive 中使用全局排序时,会将所有数据交给一个 Reduce 任务进行计算,实现查询结果的全局排序。所以数据量大的情况下会耗费大量的时间。
  • Hive 适用于离线处理,执行全量计算任务时,一般不会用到全局排序。如果涉及到全局排序场景,需要将 Hive 处理后的数据存放到快速查询的产品中,比如 Presto、Impala、ClickHouse 等等。
  • 数据处理过程中的全局排序,最好使用 UDF 转换为局部排序。
    • 先预估数据的范围,将数据划分为多个批次;
    • 每个批次会分发到一个 Reducer 执行任务,然后在每个 Reduce 作业中进行局部排序。
    • 一般不涉及到全局排序,可以先通过子查询减小查询范围,然后再排序。

如果是 TOPN 的情况,先用子查询对每个 Reducer 排序,然后取前 N 个数据,最后对结果集进行全局排序。

select t.id, t.name from 
(select id, name from <table_name>distributed by length(name) sort by length(name) desc limit 10
) t
order by length(t.user_name) desc limit 10;

局部排序 SortBy

SELECT <select_expression>, <select_expression>, ...FROM <table_name>SORT BY <col_name> [ASC|DESC] [,col_name [ASC|DESC], ...]

局部排序操作,Hive 会在每个 Reduce 任务中对数据进行排序,当启动多个 Reduce 任务时,OrderBy 输出一个文件,SortBy 输出多个文件且局部有序。

聚合操作 GroupBy、DistributeBy、ClusterBy

  • GroupBy:按照某些字段的值进行分组,在底层 MapReduce 执行过程中,同一组的数据会发送到同一个 Reduce 任务中,意味着每个 Reduce 会包含多组数据,同一组的数据会单独进行聚合运算。

可以配置 Reducer 数量 mapred.reduce.tasks,或者配置 hive.groupby.skewindata=true 来优化数据倾斜问题。

select col1, [col2], count(1), sel_expr(聚合操作) from table
where condition                  -- Map端执行
group by col1 [,col2]            -- Reduce端执行
[having]                         -- Reduce端执行

img

  • DistributeBy:通过哈希取模的方式,将列值相同的数据发送到同一个 Reducer 任务,只是单纯的分散数据,不执行其他操作。
SELECT <select_expression>, <select_expression>, ...FROM <table_name>DISTRIBUTE BY <col_list>[SORT BY <col_name> [ASC|DESC] [, col_name [ASC|DESC], ...] ]

DistrubuteBy 通常和 SortBy 一起使用,实现先聚合后排序。并且可以指定升序 ASC 还是降序 DESC,但 DistributeBy 必须在 SortBy 之前。

  • ClusterBy:把相同值的数据聚合到一起并且排序,效果等价于 distribute by col sort by col
SELECT <select_expression>, <select_expression>, ...FROM <table_name>CLUSTER BY <col_list>

ClusterBy 没有 DistributeBy 那么灵活,并且不能自定义排序,当 DistributeBy 和 SortBy 列完全相同且按照升序排序时,等价于执行 ClusterBy。

Join优化

MySQL Join 优化

https://blog.csdn.net/norminv/article/details/108020102

MySQL JOIN 都是通过循环嵌套的方式实现,用小表驱动大表减少多次连接操作带来的性能开销。

  • left join:小表 left join 大表;
  • right join:大表 right join 小表;
  • 用子查询代替 JOIN 减少驱动表扫描行数。

举个例子,如下用子查询优化示例:

selecto.no,s_order.no,sum(s_item.count),sum(after_sale_item.count)frombuyer_order oleft join seller_order s_order on o.id = s_order.buyer_order_idleft join seller_order_item s_item on s_order.id = s_item.seller_order_idleft join seller_order_after_sale after_sale on s_order.id = after_sale.seller_order_idleft join seller_order_after_sale_item after_sale_item on after_sale.id = after_sale_item.after_sale_id
where o.add_time >='2019-05-01'
group byo.id,s_order.id
order byo.id
limit 0,10

用子查询优化后:

selecto.id,o.no,s_order.no,(select sum(sot.count) from seller_order soleft join seller_order_item sot on so.id = sot.seller_order_idwhere so.id =s_order.id ),(select sum(osat.count) from seller_order_after_sale osaleft join seller_order_after_sale_item osat on osa.id = osat.after_sale_idwhere osa.seller_order_id = s_order.id )frombuyer_order oleft join seller_order s_order on o.id = s_order.buyer_order_id
where o.addTime >='2019-05-01'
order byo.id
limit 0,10
  • 通过子查询减少了 left join 次数,从而减少驱动表的数据量;
  • 减少了 groupby 的使用,方案一中先分组再取后 10 条;方案二先取后 10 条再执行聚合操作,效率更高。

SteamTable

Hive 执行 Join 操作时,默认会将前面的表直接加载进缓存,后一张表进行 stream 处理,即 shuffle 操作。这样可以减少 shuffle 过程,因为直接加载到缓存中的表,只需要等待后面 stream表的数据,不需要进行 shuffle。

使用时通过声明 /*+ STREAMTABLE(xxx) */ 来定义 stream 表:

SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)

MapJoin

MapJoin 即将小表直接加载到 Map 作业中,减少 shuffle 开销。

SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a JOIN b ON a.key=b.key

数据倾斜

数据倾斜主要表现在,执行 map/reduce 时 reduce 大部分节点执行完毕,但是有一个或几个 reduce 节点运行很慢,导致程序整体处理时间很长;

数据倾斜发生原因:

  • join:使用 join 关键字处理的问题;
    • 小表驱动大表,但是 key 比较集中导致分发到某个 Reduce 上的数据远高于平均值;
    • 大表驱动大表,但是分桶判断字段空值过多,空值由一个 Reduce 处理;
  • group by:先分派后聚合,某个 Reduce 处理耗时很长;
  • count distinct:特殊值过多。

解决方案:

参数调节

  • Map 端部分聚合:配置 hive.map.aggr=true
  • 数据倾斜时进行负载均衡:配置 hive.groupby.skewindata=true;它生成的查询计划有两个 MR Job,一个 MR Job 会将 Map 的结果随机分不到 Reduce 中;另外一个 MR Job 则根据预处理结果按照 Key 值相同分布到同一个 Reduce 中,最后完成聚合操作。

SQL调节

  • 大小表Join:MapJoin 让小表先进内存,在 Map 端完成 Reduce 操作,减少 shuffle;
  • 大表大表Join:将空值变成字符串加上随机数,将倾斜数据分散到不同 Reduce,避免零值/空值分布到同一个 Reduce 导致倾斜;

用户自定义函数

Hive 除了支持内置函数外,还允许用户自定义函数来扩充函数的功能;
UDF 对每一行数据处理,输出一行数据;
UDAF 对多行数据处理,最终输出一行数据,一般用于聚合操作;
UDTF 对一行数据处理,输出多个结果,比如将一行字符串按照某个字符拆分后进行存储,表的行数会增加。

创建函数:

-- 临时创建
ADD JARS[S] <local_hdfs_path>;
CREATE TEMPORARY FUNCTION <function_name> AS <class_name>;
DROP TEMPORARY FUNCTION <function_name>;--- 永久创建
CREATE PERMANENT FUNCTION <function_name> AS <class_name> [USING JAR|FILE <file_uri>];
DROP PERMANENT FUNCTION <function_name>;

UDF

实现方式有两种:继承UDF、继承 GenericUDF,其中 GenericUDF 处理起来更加灵活。

继承 GenericUDF 的步骤:

  • initialize 方法,检查输入数据并初始化;
  • evaluate 方法,执行数据处理过程,返回最终结果;
  • getDisplayString 方法,定义 explain 的返回内容。
@org.apache.hadoop.hive.ql.exec.Description(name = "AvgScore",extended = "示例:select AvgScore(score) from src;",value = "_FUNC_(col)-对Map类型保存的学生成绩进行平均值计算")
public class AvgScore extends GenericUDF {// 检查输入数据,初始化输入数据@Overridepublic ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {…}// 数据处理,返回最终结果@Overridepublic Object evaluate(DeferredObject[] deferredObjects) throws HiveException {…}// 函数执行 HQL Explain 展示的字符串内容@Overridepublic String getDisplayString(String[] strings) {…}
}
  • 然后将函数打包为 jar 上传到服务器对应路径 /xxxx/xxx/xxx.jar
  • 将 jar 包添加到 hive 的 classpath:add jar /xxxx/xxx/xxx.jar
  • 创建临时函数与开发好的 java class 关联:create temporary function func_name as "xxxx.xxx.xxx.MyUDF"
  • hql 中使用临时函数:select func_name(col) from src;

UDAF

实现方式:继承 UDAF、AbstractGenericUDAFResolver,其中 AbstractGenericUDAFResolver 更加灵活。

使用 AbstractGenericUDAFResolver 的步骤:

  • 继承 AbstractAggregationBuffer,来保存中间结果;
  • 继承 GenericUDAFEvaluator,实现 UDAF 处理流程;
  • 继承 AbstractGenericUDAFResolver,注册 UDAF。
// UDAF 注册函数
public class FieldLength extends AbstractGenericUDAFResolver {@Overridepublic GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {return super.getEvaluator(info);}
}// 保存中间结果
class FieldLengthAggregationBuffer extends GenericUDAFEvaluator.AbstractAggregationBuffer {private Integer value = 0;public Integer getValue() {return value;}public void setValue(Integer value) {this.value = value;}@Overridepublic int estimate() {return JavaDataModel.PRIMITIVES1;}public void add(int addVal) {synchronized (value) {value += addVal;}}
}// 数据处理函数
class FieldLengthUDAFEvaluator extends GenericUDAFEvaluator {// 输入private PrimitiveObjectInspector inputOI;// 输出private ObjectInspector outputOI;// 前一个阶段输出private PrimitiveObjectInspector integerOI;/*** 数据校验、数据初始化* 由于 UDAF 会执行 Map、Reduce 两个阶段任务,所以根据 Mode.xxx 区分具体阶段* @param m* @param parameters* @return* @throws HiveException*/@Overridepublic ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {super.init(m, parameters);// Map 阶段,输入为原始数据if (Mode.PARTIAL1.equals(m) || Mode.COMPLETE.equals(m)) {inputOI = (PrimitiveObjectInspector) parameters[0];} else {// combiner、redeuce 阶段基于前一个阶段的返回值作为输入integerOI = (PrimitiveObjectInspector) parameters[0];}// 指定输出类型outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,ObjectInspectorFactory.ObjectInspectorOptions.JAVA);return outputOI;}/*** 获取中间存放结果对象* @return* @throws HiveException*/@Overridepublic AggregationBuffer getNewAggregationBuffer() throws HiveException {return new FieldLengthAggregationBuffer();}/*** 重置中间结果* @param aggregationBuffer* @throws HiveException*/@Overridepublic void reset(AggregationBuffer aggregationBuffer) throws HiveException {((FieldLengthAggregationBuffer)aggregationBuffer).setValue(0);}/*** Map 阶段* @param aggregationBuffer* @param objects* @throws HiveException*/@Overridepublic void iterate(AggregationBuffer aggregationBuffer, Object[] objects) throws HiveException {if (objects == null || objects.length < 1) {return;}Object javaobj = inputOI.getPrimitiveJavaObject(objects[0]);((FieldLengthAggregationBuffer)aggregationBuffer).add(String.valueOf(javaobj).length());}/*** 返回 Map、Combiner 阶段结果* @param aggregationBuffer* @return* @throws HiveException*/@Overridepublic Object terminatePartial(AggregationBuffer aggregationBuffer) throws HiveException {return terminate(aggregationBuffer);}/*** Reduce 阶段* @param aggregationBuffer* @param o* @throws HiveException*/@Overridepublic void merge(AggregationBuffer aggregationBuffer, Object o) throws HiveException {((FieldLengthAggregationBuffer) agg).add((Integer)integerOI.getPrimitiveJavaObject(partial));}/*** 返回最终结果* @param aggregationBuffer* @return* @throws HiveException*/@Overridepublic Object terminate(AggregationBuffer aggregationBuffer) throws HiveException {return ((FieldLengthAggregationBuffer)aggregationBuffer).getValue();}
}

UDTF

继承 GenericUDTF 类,并重写 initialize、process、close 方法:

  • initialize:初始化返回值类型;
  • process:具体数据处理过程;
  • close:清理收尾工作;
  • forward:传递输出给收集器;
public class JsonParser extends GenericUDTF {private PrimitiveObjectInspector stringOI = null;// 输入数据解析,初始化@Overridepublic StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {if (argOIs.length != 1) {throw new UDFArgumentException("take only one argument");}// 输入必须为 PRIMITIVE 类型,且具体类型必须为 Stringif (argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE &&((PrimitiveObjectInspector)argOIs[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {throw new UDFArgumentException("take only one string argument");}// 初始化输入stringOI = (PrimitiveObjectInspector) argOIs[0];// 定义输出类型List<String> fieldNames = new ArrayList<String>();List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();fieldNames.add("name");fieldNames.add("value");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);}private ArrayList<Object[]> parseInputRecord(String feature) {ArrayList<Object[]> resultList = null;//......//.....return resultList;}@Overridepublic void process(Object[] objects) throws HiveException {final String feature = stringOI.getPrimitiveJavaObject(objects[0]).toString();ArrayList<Object[]> results = parseInputRecord(feature);Iterator<Object[]> it = results.iterator();while (it.hasNext()) {Object[] strs = it.next();// 结果 Key-Value 传递给收集器forward(strs);}}@Overridepublic void close() throws HiveException {}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/779629.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自定义注解(Annotation)

一、概念 Annotation(注解)是 Java 提供的一种对元程序中元素关联信息和元数据(metadata)的途径和方法。 Annatation(注解)是一个接口,程序可以通过反射来获取指定程序中元素的 Annotation对象,然后通过该Annotation 对象来获取注解中的元数据信息。 二、 4 种标准元注解…

Git合并之————指定提交记录合并

应用场景在测试环境提交了多个功能代码, 其中一个功能需要提前上线如图所示, 红框部分为我本次需要上线的功能提交记录代码, 绿框部分为我已选择上线成功, 可以看到红框与绿框直接的内容并没有被带入master分支.这里我以IDEA为例.首先, 切换到master分支, 也就是你需要合并的分…

宝藏软件 | 推荐一个免费电影app(无广告不卡顿)

就简单介绍一下吧,我要推荐的这个电影app优点是免费无广告,不卡顿。喜欢看电影的我们应该都是对广告深恶痛绝的,卡顿也是平常看电影的一个痛点,有时卡一下卡一下真的很不爽。这个app也有缺点,就是影视资源不好找,需要有一点技巧才能找到大量资源。我整理了一个影视资源包…

MySQL 5.7 DDL 与 GH-OST 对比分析

本文首先介绍MySQL5.7 DDL以及GH-OST的原理,然后从效率、空间占用、锁阻塞、binlog日志产生量、主备延时等方面,对比GH-OST和MySQL5.7 DDL的差异。作者:来自 vivo 互联网存储研发团队- Xia Qianyong本文首先介绍MySQL 5.7 DDL以及GH-OST的原理,然后从效率、空间占用、锁阻塞…

我叫小舞,跳舞的舞!新斗罗大陆游戏详细图文架设教程

“我叫小舞,跳舞的舞”,唐三不知道的是,这个叫做小舞的十万年魂兽会成为他一生的羁绊。就让我们一起再一次去斗罗大陆感受唐三和小舞的爱恋吧~前言 “我叫小舞,跳舞的舞”,唐三不知道的是,这个叫做小舞的十万年魂兽会成为他一生的羁绊。在星斗大森林被武魂殿追杀生死存亡…

聚焦OLAP性能提升,火山引擎ByteHouse发布六大场景解决方案

随着数字化转型的深入,不仅仅是电商领域,各行业都需要基于实时数据做出关键决策。而对企业底层数据系统来说,在处理复杂的数据分析任务,如多维度的关联分析或数据挖掘时,可能出现延迟、崩溃、错误等问题,导致数据不准确、不完整、时效性差。随着数字化转型的深入,不仅仅…

AI挑战高考作文,看AI怎么写自己?

本文由 ChatMoney团队出品现在已经离高考过去快二个月了,不管是考的好或不好,都是人生中难以忘记的一个拐点,那么在现如今AI时代,我不禁想到,如果用人工智能参加高考,会得多少分呢? 不知道朋友们有没有关注今年的高考作文题目呢?有部分考生反馈,很庆幸,自己的语文老师…

.NET 与 LayUI 实现高效敏捷开发框架

前言 WaterCloud 是一个集成了 LayUI 的高效敏捷开发框架,专为 .NET 开发者设计。 它不仅支持多种 .NET 版本(.NET 4.5、.NET Core 3.1、.NET 5、.NET 6),还内置了丰富的功能,如权限管理、流程表单设计以及多数据库支持下的多租户架构。使用了 ORM(SqlSugar 和 Chloe ) …

at32+rtos+lwip2.1.2下使用snmp时发生死机的bug记录

主要问题:设置snmp集合是用局部变量传递发生的内存改变导致取指针指向的地址值越界发生死机

Linux中可怕的fork炸弹

什么是Linux中的 Fork 炸弹 Linux中的Fork炸弹(Fork Bomb)是一种拒绝服务攻击的形式,它利用了操作系统中的“fork()”系统调用。 Fork()系统调用用于创建新进程,该进程是调用进程的副本。 Fork炸弹利用此系统调用创建大量的子进程,以消耗系统资源,例如内存和CPU时间,…

博客总领--咸鱼翻身记

咸鱼犯神经翻身记为什么我为OI泪目?因为我菜得离谱......博客更新计划难度 知识点名称 入队时间 出队时间 链接橙 双指针 8.6 8.8橙 二分答案 8.6 IDK橙 离散化 8.6 IDK橙 归并排序 8.6 IDK橙 二维前缀和 8.6 IDK黄 ST表 8.6 IDK黄 最小生成树 8.6 IDK绿 树上问题 8.6 IDK绿 空…

前端联调budget

测试环境一直报错{"id":"grpc.client","code":14,"detail":"message:connection error: desc = \"transport: Error while dialing dial tcp 172.21.0.2:34083: i/o timeout\", callee_addr:172.21.0.2:34083",&q…