flink watermark 实例分析

WATERMARK 定义了表的事件时间属性,其形式为:

 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark是触发计算的机制,只要事件时间<= watermark,就会触发当前行数据的计算,watermark的形象描述如下:
在这里插入图片描述

watermark的窗口触发机制

watermark会根据数据流中event的时间戳发生变化。通常情况下,event都是乱序的,不按时间排序的。watermark的计算逻辑为:当前最大的 event time - 最大允许延迟时间(MaxOutOfOrderness)。在同一个分区内部,当watermark大于或者等于窗口的结束时间时,才能触发该窗口的计算,即watermark>=windows endtime。如下图所示:
在这里插入图片描述
根据上图分析:
MaxOutOfOrderness = 5s,窗口的大小为:10s。
watermark分别为:12:08、12:15、12:30
计算逻辑为:WM(12:08)=12:13 - 5s;WM(12:15)=12:20 - 5s;WM(12:30)=12:35 - 5s

  • 对于 [12:00,12:10) 窗口,需要在WM=12:15时,才能被触发计算,参与计算的event为:event(12:07)/event(12:01)/event(12:07)/event(12:09),event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不参与计算,因为还未到窗口时间,也就是event time 为 [12:00,12:10] 窗口内的event才能参与计算。
    注意,如果过了这个窗口期,再收到 [12:00,12:10] 窗口内的event,就算超过了最大允许延迟时间(MaxOutOfOrderness),不会再参与计算,也就是数据被强制丢掉了。
  • 对于 [12:10,12:20][12:20,12:30] 窗口,会在WM=12:30时,被同时触发计算,参与**[12:10,12:20]** 窗口计算的event为:event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18);参与 [12:20,12:30] 窗口计算的event为:event(12:20)/event(12:20);在这个过程中event(12:05)会被丢弃,不会参与计算,因为已经超了最大允许延迟时间(MaxOutOfOrderness)

迟到的事件的处理,在介绍watermark时,提到了现实中往往处理的是乱序event,即当event处于某些原因而延后到达时,往往会发生该event time < watermark的情况,所以flink对处理乱序event的watermark有一个允许延迟的机制,这个机制就是最大允许延迟时间(MaxOutOfOrderness),允许在一定时间内迟到的event仍然视为有效event。

WATERMARK rowtime_column_name 取值两种方式

rowtime_column_name为计算列

CREATE TABLE pageviews (mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), --计算列,必须为TIMESTAMP(3)/TIMESTAMP_LTZ(3)类型WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

rowtime_column_name为事件时间属性

CREATE TABLE dataGen(uuid VARCHAR(20),name INT,age INT,ts TIMESTAMP(3), --事件时间属性,字段类型为TIMESTAMP(3)WATERMARK FOR ts AS ts
)with('connector' = 'datagen','rows-per-second' = '10','number-of-rows' = '100','fields.age.kind' = 'random','fields.age.min' = '1','fields.age.max' = '10','fields.name.kind' = 'random','fields.name.min' = '1','fields.name.max' = '10');

watermark使用demo

CREATE TABLE kafka_table(mid bigint,db string,sch string,tab string,opt string,ts bigint,ddl string,err string,src map < string, string >,cur map < string, string >,cus map < string, string >,group_name as COALESCE(cur['group_name'], src['group_name']),batch_number as COALESCE(cur['batch_number'], src['batch_number']),event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH ('connector' = 'kafka','properties.bootstrap.servers' = '***','topic' = 'topic1','format' = 'json','properties.group.id' = '*****','scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

watermark在over聚合中的使用

--RANGE:每个group_name计算当前group_name前10分钟内收到的同一group_name的所有总数
selectgroup_name
,event_time
,COUNT(group_name) OVER w1 as cnt
from kafka_table
where UPPER(opt) <> 'DELETE'
WINDOW w1 AS (PARTITION BY group_nameORDER BY event_timeRANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW)

watermark在windows聚合中的使用

--求每10分钟的滚动窗口内同一group_name的所有总数
create view tmp as
SELECT group_name,event_time FROM kafka_table where UPPER(opt) <> 'DELETE';select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

参考:
Window Aggregation
Over Aggregation

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/292423.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Sharding JDBC 学习了解 - 总览和概念

第一部分&#xff1a;概述 ShardingSphere是一个由一套分布式数据库中间件解决方案组成的开源生态圈&#xff0c;包括Sharding-JDBC、Sharding-Proxy和Sharding-Proxy 3个独立产品。它们都提供了数据分片、分布式事务、数据库编排等功能&#xff0c;适用于Java同构、异构语言、…

docker学习(十一、Redis集群存储数据方式)

文章目录 一、集群数据存储1.单机连接集群问题2.集群方式连接redis存储数据 二、 查看集群信息 docker搭建Redis集群相关知识&#xff1a; docker学习&#xff08;九、分布式存储亿级数据知识&#xff09; docker学习&#xff08;十、搭建redis集群&#xff0c;三主三从&#x…

UE5 Landscape 制作GIS卫星图地形

1. 总体想法&#xff1a; 制作GIS地形&#xff0c;使用Landscaping MapBox是一个好方法&#xff0c;但是区域过大&#xff0c;会占用很多内存 https://blog.csdn.net/qq_17523181/article/details/135029614 如果采用QGis&#xff0c;导出卫星图&#xff0c;在UE5里拼合出地形…

docker 部署kafka

随笔记录 目录 1. 安装zookeeper 2. 安装Kafka 2.1 拉取kafka image 2.2 查询本地docker images 2.3 查看本地 容器&#xff08;docker container&#xff09; 2.3.1 查看本地已启动的 docker container 2.3.2 查看所有容器的列表&#xff0c;包括已停止的容器。 2.4 …

基于ssm二手车交易平台的设计论文

摘 要 进入21世纪网络和计算机得到了飞速发展&#xff0c;并和生活进行了紧密的结合。目前&#xff0c;网络的运行速度以达到了千兆&#xff0c;覆盖范围更是深入到生活中的角角落落。这就促使二手交易网站的发展。二手交易网站可以实现远程购物&#xff0c;远程选择喜欢的商品…

【数学建模美赛F奖速成系列】论文写作技巧+优秀论文讲评

目录 写在前面推荐课程历年优秀美赛论文 写在前面 由于美赛参赛规则是要求提交英文摘要和论文&#xff0c;作为评奖的唯一依据&#xff0c;这就要求参赛学生既要具有较好的数学建模能力&#xff0c;同时也需要具备较高的英语写作水平&#xff0c;并熟练掌握英语论文的写作技巧…

考研数学二内容总结

目录 高等数学 一、函数、极限、连续 考试内容 &#x1f3c1;总结&#xff1a; 考试要求 &#x1f3c1;1&#xff0e;理解函数的概念&#xff0c;掌握函数的表示法&#xff0c;会建立应用问题的函数关系&#xff0e; 2&#xff0e;了解函数的有界性、单调性、周期性和奇…

程序员自由创业周记#20:需求从何而来

程序员自由创业周记#20&#xff1a;需求从何而来 之前看过我周记的朋友应该了解我从7月份开始独立创业以来&#xff0c;主要做了两个产品&#xff0c;一个是加一&#xff0c;一个是Island Widgets - 灵动岛锁屏小组件 &#xff0c;上班的时候工作内容是上级主管分配&#xff0c…

C++基础语法总结

C使用 C的源文件扩展名是&#xff1a;cppC程序的入口是main函数C完全兼容c语言的语法 1、cin、cout C中常使用cin、cout进行控制台的输入和输出 #include <iostream> using namespace std;int main() {cout << "hello world !!!" << endl;retu…

Jenkins自动化构建打包,部署

1.环境准备 上传jdk&#xff0c;maven和tomcat的包&#xff0c;解压到/usr/local下并配置环境变量。 配置jdk [rootserver04 ~]# vim /etc/profile.d/java.sh JAVA_HOME/usr/local/java export PATH$JAVA_HOME/bin:$PATH##加载环境变量 [rootserver04 ~]# source /etc/profi…

DRF从入门到精通二(Request源码分析、DRF之序列化、反序列化、反序列化校验、序列化器常用字段及参数、source、定制字段、保存数据)

文章目录 一、Request对象源码分析区分原生request和新生request新的request还能像原来的reqeust一样使用吗源码片段分析总结&#xff1a; 二、DRF之序列化组件序列化介绍序列化步骤序列化组件的基本使用反序列化基本使用反序列化的新增反序列化的新增删除单条 反序列化的校验序…

Ubuntu 常用命令之 df 命令用法介绍

&#x1f4d1;Linux/Ubuntu 常用命令归类整理 在Ubuntu系统下&#xff0c;df命令是用来查看文件系统的磁盘空间占用情况的。df是disk free的缩写&#xff0c;这个命令可以获取硬盘被占用了多少空间&#xff0c;还有多少空间是可用的&#xff0c;硬盘的挂载点等信息。 df命令的…