FlinkSql 窗口函数

Windowing TVF

以前用的是Grouped Window Functions(分组窗口函数),但是分组窗口函数只支持窗口聚合

现在FlinkSql统一都是用的是Windowing TVFs(窗口表值函数),Windowing TVFs更符合 SQL 标准且更加强大,支持window join、Window aggregations、Window Top-N、Window Deduplication 

Windowing TVFs是 Flink 定义的多态表函数(Polymorphic Table Function,缩写PTF),PTF 是 SQL 2016 标准中的一种特殊的表函数,它可以把表作为一个参数

窗口函数

Flink 认为窗口把流分割为有限大小的 “桶”,这样就可以在其之上进行计算

有以下几种用法

  • 滚动窗口
  • 滑动窗口
  • 累积窗口
  • 会话窗口 (即将支持)

滚动窗口(TUMBLE)

TUMBLE 函数指定每个元素到一个指定大小的窗口中。滚动窗口的大小固定且不重复。

例如:假设指定了一个 5 分钟的滚动窗口。Flink 将每 5 分钟生成一个新的窗口,如下图所示:

TUMBLE 函数通过时间属性字段为每行数据分配一个窗口。 在流计算模式,时间属性字段必须被指定为事件或处理时间属性。在批计算模式,这个窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 的类型

--TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
data :拥有时间属性列的表。
timecol :列描述符,决定数据的哪个时间属性列应该映射到窗口。
size :窗口的大小(时长)。
offset :窗口的偏移量 [非必填]。SELECT * FROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));SELECT window_start, window_end, SUM(price)FROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end;

滑动窗口(HOP)

滑动窗口函数指定元素到一个定长的窗口中。和滚动窗口很像,有窗口大小参数,另外增加了一个窗口滑动步长参数。如果滑动步长小于窗口大小,就能产生数据重叠的效果。在这个例子里,数据可以被分配在多个窗口。

例如:可以定义一个每5分钟滑动一次。大小为10分钟的窗口。每5分钟获得最近10分钟到达的数据的窗口,如下图所示:

HOP 函数通过时间属性字段为每一行数据分配了一个窗口。 在流计算模式,这个时间属性字段必须被指定为事件或处理时间属性。在批计算模式,这个窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 的类型

-- HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
data:拥有时间属性列的表。
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
slide:窗口的滑动步长。
size:窗口的大小(时长)。
offset:窗口的偏移量 [非必填]。SELECT * FROM TABLE(HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));SELECT window_start, window_end, SUM(price)FROM TABLE(HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))GROUP BY window_start, window_end;

累积窗口(CUMULATE)

CUMULATE 函数指定元素到多个窗口,从初始的窗口开始,直到达到最大的窗口大小的窗口,所有的窗口都包含其区间内的元素,另外,窗口的开始时间是固定的。 你可以将 CUMULATE 函数视为首先应用具有最大窗口大小的 TUMBLE 窗口,然后将每个滚动窗口拆分为具有相同窗口开始但窗口结束步长不同的几个窗口。 所以累积窗口会产生重叠并且没有固定大小。

例如:1小时步长,24小时大小的累计窗口,每天可以获得如下这些窗口:[00:00, 01:00)[00:00, 02:00)[00:00, 03:00), …, [00:00, 24:00)

-- CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
data:拥有时间属性列的表。
timecol:列描述符,决定数据的哪个时间属性列应该映射到窗口。
step:指定连续的累积窗口之间增加的窗口大小。
size:指定累积窗口的最大宽度的窗口时间。size必须是step的整数倍。
offset:窗口的偏移量 [非必填]。SELECT * FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));SELECT window_start, window_end, SUM(price)FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))GROUP BY window_start, window_end;

窗口偏移

上诉窗口都有一个 offset 参数,默认值就是 0,所以窗口默认都是整点启动的

比如10分钟的滚动窗口:TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES),只会生成[2021-06-29 23:40:00, 2021-06-29 00:50:00),[2021-06-29 23:50:00, 2021-06-30 00:00:00),window_start 和 window_end 和数据的时间无关

offset 就是用来调整窗口偏移的,当 offset 为 -16 MINUTE,时间戳为 2021-06-30 00:00:04 的数据会分配到窗口 [2021-06-29 23:54:00, 2021-06-30 00:04:00)。

窗口函数进阶用法

flink开窗需要写上windowend,否则只是带了一个windowstart的时间而已,并没有真正开启窗口

Window Aggregation

窗口聚合是通过 GROUP BY 子句定义的,其特征是包含 窗口表值函数 产生的 “window_start” 和 “window_end” 列。和普通的 GROUP BY 子句一样,窗口聚合对于每个组会计算出一行数据。

SELECT window_start, window_end, SUM(price)FROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+

并且支持多级窗口聚合 

-- tumbling 5 minutes for each supplier_id
CREATE VIEW window1 AS
-- Note: The window start and window end fields of inner Window TVF are optional in the select clause. However, if they appear in the clause, they need to be aliased to prevent name conflicting with the window start and window end of the outer Window TVF.
SELECT window_start as window_5mintumble_start, window_end as window_5mintumble_end, window_time as rowtime, SUM(price) as partial_priceFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))GROUP BY supplier_id, window_start, window_end, window_time;-- tumbling 10 minutes on the first window
SELECT window_start, window_end, SUM(partial_price) as total_priceFROM TABLE(TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end;

下面是分组窗口聚合的写法,分组窗口聚合已经过时,官网不推荐使用了

SELECTuser,TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart,SUM(amount) FROM Orders
GROUP BYTUMBLE(order_time, INTERVAL '1' DAY),user

Window Join

在流式查询中,与其他连续表上的关联不同,窗口关联不产生中间结果,只在窗口结束产生一个最终的结果。另外,窗口关联会清除不需要的中间状态

目前使用时有一些限制:

目前,窗口关联需要在 join on 条件中包含两个输入表的 window_start 等值条件和 window_end 等值条件

目前,关联的左右两边必须使用相同的窗口表值函数。这个规则在未来可以扩展,比如:滚动和滑动窗口在窗口大小相同的情况下 join。

语法上支持 INNER、LEFT、RIGHT、FULL OUTER、ANTI、SEMI JOIN。而且,窗口关联可以在其他基于 窗口表值函数 的操作后使用,例如 窗口聚合,窗口 Top-N 和 窗口关联

SELECT l.id as l_id,r.id as r_id,l.window_start,l.window_end
FROM (SELECT * FROM TABLE(TUMBLE(TABLE t_left, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) l
INNER JOIN (SELECT * FROM TABLE(TUMBLE(TABLE t_right, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) r
ON l.id = r.id 
AND l.window_start = r.window_start 
AND l.window_end = r.window_end;

Window TopN

与普通Top-N不同,窗口Top-N只在窗口最后返回汇总的Top-N数据,不会产生中间结果。窗口 Top-N 会在窗口结束后清除不需要的中间状态

窗口 Top-N 适用于用户不需要每条数据都更新Top-N结果的场景,相对普通Top-N来说性能更好

SELECT *FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownumFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))) WHERE rownum <= 3;

还可以在窗口聚合后在进行窗口 Top-N

SELECT *FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownumFROM (SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cntFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))GROUP BY window_start, window_end, supplier_id)) WHERE rownum <= 3;

Window Deduplication

窗口去重是一种特殊的 去重,它根据指定的多个列来删除重复的行,保留每个窗口和分区键的第一个或最后一个数据

对于流式查询,与普通去重不同,窗口去重只在窗口的最后返回结果数据,不会产生中间结果。它会清除不需要的中间状态。 因此,窗口去重查询在用户不需要更新结果时,性能较好

Window Deduplication是一种特殊的窗口 Top-N:N是1并且是根据处理时间或事件时间排序的(目前只支持根据事件时间属性进行排序),支持在其他窗口操作上进行去重操作,比如 窗口聚合,窗口TopN 和 窗口关联

SELECT *FROM (SELECT bidtime, price, item, supplier_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY bidtime DESC) AS rownumFROM TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))) WHERE rownum <= 1;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/459206.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序新手入门教程四:样式设计

WXSS (WeiXin Style Sheets)是一套样式语言&#xff0c;用于描述 WXML 的组件样式&#xff0c;决定了 WXML 的组件会怎么显示。 WXSS 具有 CSS 大部分特性&#xff0c;同时为了更适合开发微信小程序&#xff0c;WXSS 对 CSS 进行了扩充以及修改。与 CSS 相比&#xff0c;WXSS …

spring boot和spring cloud项目中配置文件application和bootstrap中的值与对应的配置类绑定处理

在前面的文章基础上 https://blog.csdn.net/zlpzlpzyd/article/details/136065211 加载完文件转换为 Environment 中对应的值之后&#xff0c;接下来需要将对应的值与对应的配置类进行绑定&#xff0c;方便对应的组件取值处理接下来的操作。 对应的配置值与配置类绑定通过 Con…

Map 集合

Map集合 1. 概述2. 方法3. 代码示例4. 输出结果5. 注意事项 实现类&#xff1a; HashTable、HashMap、TreeMap、Properties、LinkedHashMap 其他集合类 具体信息请查看 API 帮助文档 1. 概述 Map是Java中的一种数据结构&#xff0c;用于存储键值对&#xff08;key-value pair&…

Java外卖小程序管理系统

技术架构&#xff1a; springboot ssm mysql redis 有需要该项目的小伙伴可以私信我你的Q。 功能描述&#xff1a; 商品管理&#xff1a;新增商品、所有商品 菜单管理&#xff1a;菜单管理、菜单分类 订单管理&#xff1a;订单总览&#xff08;包括未付款、已付款、已…

D音等短视频为什么这么吸引人?长期沉迷刷D音的危害 彻底戒掉刷D音上瘾 占用大量时间 注意力分散 思维浅薄 焦虑、抑郁 干扰睡眠 视力疲劳

这是你吗&#xff1f; 人生最爽的事是&#xff1a;刷痘印。 人生最不爽的事是&#xff1a;刷完&#xff0c;什么也没有得到&#xff0c;事也没做。 吸引法则 1. 内容碎片化&#xff0c;符合快节奏时代需求 短视频的时长通常只有几秒到十几分钟&#xff0c;内容简短精悍&…

C++ | vector二维数组的初始化与行、列数的获取

如果直接使用vector<int,vector<int> > v;创建二维数组&#xff0c;那么就会得到一个空的容器&#xff0c;这样再通过push_back赋值是非常麻烦的。 初始化二维数组 在此介绍二维数组初始化的一般操作。 首先看一维数组的初始化示例&#xff1a; 定义一个长度为n&a…

Stability AI一种新型随心所欲生成不同音调、口音、语气的文本到语音(TTS)音频模型

该模型无需提前录制人声样本作为参考&#xff0c;仅凭文字描述就能生成所需的声音特征。用户只需描述他们想要的声音特点&#xff0c;例如“一个语速较快、带有英国口音的女声”&#xff0c;模型即可相应地生成符合要求的语音。它不仅能模仿已有的声音&#xff0c;还能根据用户…

书生·浦语大模型全链路开源体系

1&#xff0c;简述大模型的定义与特点&#xff1a; 大模型是指参数数量大于10亿的模型&#xff0c;它的特点包括&#xff1a;模型规模大&#xff0c;数据规模大&#xff0c;计算规模大和任务数量 2. 分析大模型成为通用人工智能的重要途径的原因&#xff1a; 大模型能够从大…

yt-dlp快速上手

之前用xx下载视频经常遇到网络报错&#xff0c;于是使用 yt-dlp 这个新的yt下载工具。 安装教程 安装方法1&#xff1a;pip安装 可以使用pip安装yt-dlp&#xff0c;然后可以全局使用 使用pip快速安装如下: python3 -m pip install -U yt-dlp不安装其他依赖的安装命令如下:…

【保姆级教程|YOLOv8改进】【7】多尺度空洞注意力(MSDA),DilateFormer实现暴力涨点

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

【JS逆向一】逆向某站的 加密参数算法--仅供学习参考

逆向日期&#xff1a;2024.02.06 使用工具&#xff1a;Node.js 文章全程已做去敏处理&#xff01;&#xff01;&#xff01; 【需要做的可联系我】 可使用AES进行解密处理&#xff08;直接解密即可&#xff09;&#xff1a;在线AES加解密工具 1、打开某某网站(请使用文章开头的…

BaseMapper中提供的方法(17种CRUD)

BaseMapper封装的17种增删改查方法 MybatisPlus框架中mapper层继承了BaseMapper接口&#xff0c;该接口中封装了常用的增删改查方法&#xff0c;共有17种&#xff0c;以下是方法的详情介绍 首先需要明确的括号内的一些对象定义 泛型T&#xff1a;实体类类型Param注解&#x…