【大数据】Flink SQL 语法篇(四):Group 聚合

Flink SQL 语法篇(四):Group 聚合

  • 1.基础概念
  • 2.窗口聚合和 Group 聚合
  • 3.SQL 语义
  • 4.Group 聚合支持 Grouping sets、Rollup、Cube

1.基础概念

Group 聚合定义(支持 Batch / Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照数据的类别进行分组,比如年龄、性别,是横向的;而窗口聚合是在时间粒度上对数据进行分组,是纵向的。如下图所示,就展示出了其区别。其中 按颜色分 key(横向)就是 Group 聚合按窗口划分(纵向)就是 窗口聚合

在这里插入图片描述

2.窗口聚合和 Group 聚合

应用场景:一般用于对数据进行分组,然后后续使用聚合函数进行 countsum 等聚合操作。

那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?

首先来举一个例子看看怎么将 窗口聚合 转换为 Group 聚合。假如一个窗口聚合是按照 1 1 1 分钟的粒度进行聚合,如下 滚动窗口 SQL:

-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
)-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
)-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000  as window_start
from source_table
group bydim,-- 按照 Flink SQL tumble 窗口写法划分窗口tumble(row_time, interval '1' minute)

转换为 Group 聚合 的写法如下:

-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- 用户 iduser_id BIGINT,-- 用户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.dim.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '100000','fields.price.min' = '1','fields.price.max' = '100000'
);-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH ('connector' = 'print'
);-- 数据处理逻辑
insert into sink_table
select dim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group bydim,-- 将秒级别时间戳 / 60 转化为 1mincast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

确实没错,上面这个转换是一点问题都没有的。

但是窗口聚合和 Group by 聚合的差异在于:

  • 本质区别窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。
  • 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由 时间(Watermark)推动的。Group by 聚合完全由 数据 推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。

3.SQL 语义

SQL 语义这里也拿离线和实时做对比,Order 为 Kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子。

  • 数据源算子From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。
  • Group 聚合算子group by key + sum / count / max / min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum / count / max / min 结果。如果有结果 oldResult,拿出来和当前的数据进行 sum / count / max / min 计算出这个 key 的新结果 newResult,并将新结果 [key, newResult] 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息 -[key, oldResult],然后再将新结果发往下游 +[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum / max / min 结果 newResult,并将新结果 [key, newResult] 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送 +[key, newResult]
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中这个实时任务也是 24 24 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。

4.Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping setsRollupCube。举一个 Grouping sets 的案例:

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES('supplier1', 'product1', 4),('supplier1', 'product2', 3),('supplier2', 'product3', 3),('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (( supplier_id, product_id, rating ),( supplier_id, product_id         ),( supplier_id,             rating ),( supplier_id                     ),(              product_id, rating ),(              product_id         ),(                          rating ),(                                 )
)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/491411.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringMVC 学习(四)之获取请求参数

目录 1 通过 HttpServletRequest 获取请求参数 2 通过控制器方法的形参获取请求参数 3 通过 POJO 获取请求参数&#xff08;重点&#xff09; 1 通过 HttpServletRequest 获取请求参数 public String handler1(HttpServletRequest request) <form action"${pageCont…

7.网络游戏逆向分析与漏洞攻防-游戏网络架构逆向分析-通过逆向分析确定游戏明文接收数据过程

内容参考于&#xff1a;易道云信息技术研究院VIP课 上一个内容&#xff1a;通过逆向分析确定游戏明文发送数据过程 上一个内容中得出它是使用的send函数发送的数据包&#xff0c;所以接收数据它指定用的是recv函数接收的数据 然后在跳转recv函数分析时发现跳转到了wsock32.d…

【嵌入式移植】7、U-Boot源码分析4—链接脚本分析

U-Boot源码分析4—链接脚本分析 1 u-boot-spl.lds1.1 链接脚本的生成1.2 u-boot-spl.lds内容分析1.3 text - 程序代码段1.4 sram其它段定义1.4.1 .rodata只读数据段1.4.2 .data数据段1.4.3 .u_boot_list段 1.5 BSS段1.6 /DISCARD/ 从上一篇文章【嵌入式移植】6、U-Boot源码分析…

Linux字符设备驱动中同类型多设备节点的创建---一个驱动程序支持多个同类型设备

文章目录 前言1 代码解析1.1 驱动层1.2 应用层 2 运行结果总结 前言 本期分享的内容相对比较简单&#xff0c;那就是同时注册多个同类型的字符设备驱动&#xff0c;那么这样我们就可以同时支持多个同类型的设备了&#xff01;下面来带大家看一下&#xff1a; 1 代码解析 1.1 …

【Flink精讲】Flink性能调优:CPU核数与并行度

常见问题 举个例子 提交任务命令&#xff1a; bin/flink run \ -t yarn-per-job \ -d \ -p 5 \ 指定并行度 -Dyarn.application.queuetest \ 指定 yarn 队列 -Djobmanager.memory.process.size2048mb \ JM2~4G 足够 -Dtaskmanager.memory.process.size4096mb \ 单个 TM2~8G 足…

MySQL知识点总结(五)——锁

MySQL知识点总结&#xff08;五&#xff09;——锁 锁分类表锁 & 行锁如何添加表锁&#xff1f;如何添加行锁&#xff1f; 读锁 & 写锁行锁 & 间隙锁&#xff08;gap lock&#xff09;& 临键锁&#xff08;next-key lock&#xff09; 加锁机制分析可重复读隔离…

OpenAI视频生成Sora技术简析

基本介绍 Sora是春节期间OpenAI发布的产品&#xff0c;主要是通过文字描述生成视频&#xff0c;通过大规模视频数据训练而成的生成模型&#xff0c;当前还没开放试用。官方发布的技术报告&#xff1a;https://openai.com/research/video-generation-models-as-world-simulators…

趣学贝叶斯定理:贝叶斯定理的先验概率、似然和后验概率(1)

在第7章中&#xff0c;我们讨论了如何利用空间推理去推导贝叶斯定理。现在研究如何将贝叶斯定理当作一种概率工具&#xff0c;对不确定性进行逻辑推理。本章将利用贝叶斯定理来计算和量化在给定数据的情况下&#xff0c;信念有多大的可能性为真。为此&#xff0c;需要使用该定理…

019 Spring Boot+Vue 电影院会员管理系统(源代码+数据库+文档)

部分代码地址&#xff1a; https://github.com/XinChennn/xc019-cinema 一、系统介绍 cinema项目是一套电影院会员管理系统&#xff0c;使用前后端分离架构开发包含管理员、会员管理、会员卡管理、电影票、消费记录、数据统计等模块 二、所用技术 后端技术栈&#xff1a; …

【泰山派RK3566】智能语音助手(一)移植Kaldi语音转文字

文章目录 移植过程硬件资源下载测试 移植过程 参考我的这篇博客 【RV1126】移植kaldi实时语音识别 硬件 资源下载 链接&#xff1a;https://pan.baidu.com/s/1x1udT5eNzzQHoPOTCQ182A?pwdlief 提取码&#xff1a;lief –来自百度网盘超级会员V6的分享 下载的文件里面有一个…

全志H713/H618方案:调焦电机(相励磁法步进电机)的驱动原理、适配方法

一、篇头 全志H713平台&#xff0c;作为FHD投影的低成本入门方案&#xff0c;其公板上也配齐了许多投影使用的模组&#xff0c;本文即介绍投影仪调焦所用的步进电机&#xff0c;此模组的驱动原理、配制方法、调试方法。因为条件限制&#xff0c;本文采用的是H618香橙派Z3平台&…

Escalate_Linux(4)-利用SUDO实现提权

利用SUDO实现提权 利用用户的sudo授权获得root的shell cat /etc/passwd cat /etc/sudoers 命令没有权限 echo "cat /etc/sudoers" >/tmp/ls chmod 755 /tmp/ls export PATH/tmp:$PATH /home/user5/script 想办法更改user1的口令 echo echo "user1:xiao…