0基础学习PyFlink——水位线(watermark)触发计算

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》和《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》中,我们发现如果窗口中元素个数没有把窗口填满,则不会触发计算。
在这里插入图片描述

为了解决长期不计算的问题,我们引入了在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》和《0基础学习PyFlink——时间滑动窗口(Sliding Time Windows)》的方案。但是这个方案引入另外一个问题,就是每次处理数据可能不尽相同。这是因为它们使用了“处理时间”(Processing Time)来作为窗口划分的参考系,而每次程序处理时间会根据当前负载情况有很大的不同。这样我们对同一批数据做处理时,可能会得出不同的Window切分方案。
在这里插入图片描述
于是我们引入《0基础学习PyFlink——事件时间和运行时间的窗口》方案。它可以使用源自数据本身的“事件时间”(Event Time)作为Time Window的参考系,这样在不同负载、不同时间,相同数据的时间参考系是一样的,进而可以得出一致的结果。
在这里插入图片描述
但是现实中,我们没法保证上述数据是按照上面的顺序到达Flink的。
比如下面这个例子,红色部分都是乱序的,那么Flink如何处理这些数据呢?
在这里插入图片描述
只有两种可能性:

  1. 直接抛弃;
  2. 等待一段时间统一处理,超过等待的时间直接抛弃。因为不可能一直等下去,否则什么时候处理呢?

这些即有别于Count Window,也有别于Time Window。这个时候就要引入水位线(watermark)技术来解决这个问题。
在详细讲解之前,我们需要明确一些基本知识:

  1. EventTime就是Timestamp,即我们可以通过制定Timestamp函数设定元素的EventTime。
  2. EventTime从属于元素。
  3. Watermark源于EventTime和max_out_of_orderness(等待无序数据的时间),即Watermark=EventTime-max_out_of_orderness。
  4. Watermark从属于流。
  5. Window的Start源于EventTime;End源于Start和窗口时间,即End=Start+WindowTme;这是一个左闭右开的区间,即[Start, End)。
  6. Window从属于流,只有Watermark>=Window End时才会触发计算(且窗口中要有元素)。
  7. Window在单向递增前进,比如从[0,10)变成[10,20)、[20,30)……[90,100)。
  8. Wartermark单向递增前进,它不会因为新进入的元素EventTime较小,而导致Wartermark向变小的趋势发展。
    在这里插入图片描述
    上图中,第一个元素(A,1)的EventTime通过自定义公式可以得到101,于是初始的Window的Start值是该值向下取可以被Window Size整除的最大值,即100;这个进一步确认了第一个窗口是[100,105)。
    watermark是通过eventtime计算出来的,上例中我们希望如果事件在窗口时间之外到来则抛弃,即不等待任何时间,即Window End+0,即Eventtime-0。
    (A,0)数据来到的时候,watermark不会因为其Eventtime为100,比流中的watermark值(101)小而改变,依然维持watermark单调递增。这个在(A,2)和(A,5)到来时也是如此。
    (A,8)元素的到来,会让流的watermark变成108。这个值会越过当前窗口[100,105),于是会触发计算。计算的元素要求eventtime在上述区间内,即(A,1)、(A,0)、(A,3)和(A,4);
    (A,10)元素的到来,会让流的watermark变成110。这个值会越过当前窗口[100,110),于是会触发计算。计算的元素要求eventtime在上述区间内,即(A,8)、(A,6)、(A,7)和(A,9);而(A,2)因为不在这个区间内,就被抛弃了。我们也可以认为(A,2)迟到而被抛弃。
    为了更好讲述原理,上述例子存在一个假设:watertime更新是随着元素一个个进入而改变的。而实际元素进入个数不太确定,比如可能会两个两个进入,那么就会变成如下。主要区别就是(A,5)参与了第二次窗口计算,虽然它迟到了,而且watermark计算方法也不打算等待任何一个迟到的数据,但是它和(A,10)一起进入时间戳计算逻辑,导致触发的时机被滞后,从而“幸运”的赶上了最后一轮窗口计算。如果它稍微再晚一点到来,它也会被抛弃。
    在这里插入图片描述

测试代码

import time
from pyflink.common import Duration, WatermarkStrategy, Time, Types
from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow, TumblingProcessingTimeWindows
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment,RuntimeExecutionMode, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
from pyflink.datastream.functions import AllWindowFunction, ProcessFunction, ProcessAllWindowFunction, KeyedProcessFunction
from pyflink.table.expressions import lit, col
from pyflink.table.window import Tumble
from pyflink.common.time import Instant
from pyflink.table.udf import udf
from pyflink.common import Rowclass WindowFunc(AllWindowFunction[tuple, tuple, TimeWindow]):def apply(self, window, inputs):out = "**************************WindowFunc**************************" \"\nwindow: start:{} end:{} \ninputs: {}" \"\n**************************WindowFunc**************************" \.format(Instant.of_epoch_milli(window.start), Instant.of_epoch_milli(window.end), inputs)print(out)for value in inputs:yield (value, Instant.of_epoch_milli(window.start), Instant.of_epoch_milli(window.end))class TimestampAssignerAdapter(TimestampAssigner):def extract_timestamp(self, value, record_timestamp: int):return value[1] * 1000class TimestampAssignerProcessFunctionAdapter(ProcessFunction):def process_element(self, value, ctx: 'ProcessFunction.Context'):out_put = "-----------------------TimestampAssignerProcessFunctionAdapter {}-----------------------" \"\nvalue: {} \ttimestamp: {} \tcurrent_processing_time: {} \tcurrent_watermark: {}" \"\n-----------------------TimestampAssignerProcessFunctionAdapter-----------------------" \.format(int(time.time()), value, Instant.of_epoch_milli(ctx.timestamp()),Instant.of_epoch_milli(ctx.timer_service().current_processing_time()),Instant.of_epoch_milli(ctx.timer_service().current_watermark()))print(out_put)yield (value, Instant.of_epoch_milli(ctx.timestamp()), Instant.of_epoch_milli(ctx.timer_service().current_processing_time()),Instant.of_epoch_milli(ctx.timer_service().current_watermark()))def gen_random_int_and_timestamp():stream_execute_env = StreamExecutionEnvironment.get_execution_environment()# stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)stream_execute_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)stream_execute_env.set_parallelism(1)stream_execute_env.get_config().set_auto_watermark_interval(2)stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)ordinal_num_start = 0ordinal_num_end = 10rows_per_second = 1schame = Schema.new_builder().column('in_ord', DataTypes.INT()) \.build()table_descriptor = TableDescriptor.for_connector('datagen') \.schema(schame) \.option('fields.in_ord.kind', 'sequence') \.option('fields.in_ord.start', str(ordinal_num_start)) \.option('fields.in_ord.end', str(ordinal_num_end)) \.option('rows-per-second', str(rows_per_second)) \.build()stream_table_env.create_temporary_table('source', table_descriptor)table = stream_table_env.from_path('source')@udf(result_type=DataTypes.ROW([DataTypes.FIELD("in_ord", DataTypes.INT()), DataTypes.FIELD("calc_order", DataTypes.INT())]), input_types=[DataTypes.INT()])def colFunc(oneCol):ordinal_num_data_map = {0: 1, 1: 0, 2: 3, 3: 4, 4: 8, 5: 6, 6: 7, 7: 2, 8: 9, 9: 10, 10: 5}# ordinal_num_data_map = {0: 16, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9,#                       10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 0, 17: 17, 18: 18, 19: 19,#                       20: 20, 21: 121, 22: 122, 23: 123, 24: 124, 25: 125, 26: 126, 27: 127, 28: 128, 29: 129,}data = ordinal_num_data_map[oneCol] + 100return Row(oneCol, data)input_table=table.map(colFunc(col('in_ord')))datastream = stream_table_env.to_data_stream(input_table)###############################################################################################    # datastream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(10))) \#                     .apply(WindowFunc())################################################################################################ watermark_strategy = WatermarkStrategy.no_watermarks().with_timestamp_assigner(TimestampAssignerAdapter())# datastream_with_watermark=datastream.assign_timestamps_and_watermarks(watermark_strategy)# datastream_with_watermark.process(TimestampAssignerProcessFunctionAdapter())# datastream_with_watermark.window_all(TumblingEventTimeWindows.of(Time.milliseconds(10))) \#                     .apply(WindowFunc())        ################################################################################################ watermark_strategy = WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(TimestampAssignerAdapter())watermark_strategy = WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(0)).with_timestamp_assigner(TimestampAssignerAdapter())datastream_with_watermark=datastream.assign_timestamps_and_watermarks(watermark_strategy)datastream_with_watermark.process(TimestampAssignerProcessFunctionAdapter())datastream_with_watermark.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \.apply(WindowFunc())###############################################################################################stream_execute_env.execute()if __name__ == '__main__':gen_random_int_and_timestamp()

-----------------------TimestampAssignerProcessFunctionAdapter 1699856800-----------------------
value: Row(in_ord=0, calc_order=101) timestamp: Instant<101, 0> current_processing_time: Instant<1699856800, 705000000> current_watermark: Instant<-9223372036854776, 192000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856802-----------------------
value: Row(in_ord=1, calc_order=100) timestamp: Instant<100, 0> current_processing_time: Instant<1699856802, 700000000> current_watermark: Instant<100, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856802-----------------------
value: Row(in_ord=2, calc_order=103) timestamp: Instant<103, 0> current_processing_time: Instant<1699856802, 702000000> current_watermark: Instant<100, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856804-----------------------
value: Row(in_ord=3, calc_order=104) timestamp: Instant<104, 0> current_processing_time: Instant<1699856804, 700000000> current_watermark: Instant<102, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856804-----------------------
value: Row(in_ord=4, calc_order=108) timestamp: Instant<108, 0> current_processing_time: Instant<1699856804, 709000000> current_watermark: Instant<102, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
WindowFunc
window: start:Instant<100, 0> end:Instant<105, 0>
inputs: [Row(in_ord=0, calc_order=101), Row(in_ord=1, calc_order=100), Row(in_ord=2, calc_order=103), Row(in_ord=3, calc_order=104)]
WindowFunc
-----------------------TimestampAssignerProcessFunctionAdapter 1699856806-----------------------
value: Row(in_ord=5, calc_order=106) timestamp: Instant<106, 0> current_processing_time: Instant<1699856806, 701000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856806-----------------------
value: Row(in_ord=6, calc_order=107) timestamp: Instant<107, 0> current_processing_time: Instant<1699856806, 705000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856808-----------------------
value: Row(in_ord=7, calc_order=102) timestamp: Instant<102, 0> current_processing_time: Instant<1699856808, 700000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856808-----------------------
value: Row(in_ord=8, calc_order=109) timestamp: Instant<109, 0> current_processing_time: Instant<1699856808, 701000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856809-----------------------
value: Row(in_ord=9, calc_order=110) timestamp: Instant<110, 0> current_processing_time: Instant<1699856809, 440000000> current_watermark: Instant<108, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856809-----------------------
value: Row(in_ord=10, calc_order=105) timestamp: Instant<105, 0> current_processing_time: Instant<1699856809, 441000000> current_watermark: Instant<108, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
WindowFunc
window: start:Instant<105, 0> end:Instant<110, 0>
inputs: [Row(in_ord=4, calc_order=108), Row(in_ord=5, calc_order=106), Row(in_ord=6, calc_order=107), Row(in_ord=8, calc_order=109), Row(in_ord=10, calc_order=105)]
WindowFunc
WindowFunc
window: start:Instant<110, 0> end:Instant<115, 0>
inputs: [Row(in_ord=9, calc_order=110)]

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/171209.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Arthas(阿尔萨斯)--(二)

目录 一、Arthas学习 1、JVM相关命令一 1、dashboard 2、thread 3、jvm 4、sysprop 一、Arthas学习 Arthas(阿尔萨斯)--(一) Arthas代码开源地址 1、JVM相关命令一 1、dashboard dashboard:显示当前系统的实时数据面板&#xff0c;按q或ctrlc退出 ID: Java 级别的线…

Docker - 容器数据卷

Docker - 容器数据卷 什么是容器数据卷 等同于挂载&#xff0c;将容器内的目录地址指向于宿主机文件系统中 直接使用命令来挂载 -v docker run -it -v 主机目录:容器内目录# 测试 docker run -it -v /root:/home centos /bin/bash [rootiZ2zeg7mctvft5renx1qvbZ ~]# docker …

vscode 快速打印console.log

第一步 输入这些 {// Print Selected Variabl 为自定义快捷键中需要使用的name&#xff0c;可以自行修改"Print Selected Variable": {"body": ["\nconsole.log("," %c $CLIPBOARD: ,"," background-color: #3756d4; padding:…

Linux---(六)自动化构建工具 make/Makefile

文章目录 一、make/Makefile二、快速查看&#xff08;1&#xff09;建立Makefile文件&#xff08;2&#xff09;编辑Makefile文件&#xff08;3&#xff09;解释&#xff08;4&#xff09;效果展示 三、背后的基本知识、原理&#xff08;1&#xff09;如何清理对应的临时文件呢…

Js 语句

JavaScript 语句向浏览器发出的命令&#xff0c;语句的作用是告诉浏览器该做什么&#xff1b;分号用于分隔 JavaScript 语句&#xff0c;通常我们在每条可执行的语句结尾添加分号&#xff1b;使用分号的另一用处是在一行中编写多条语句。 JavaScript 语句通常以一个 语句标识符…

postgreSQL中的高速缓存

1. 高速缓存简介 ​如下图所示&#xff0c;当一个postgreSQL进程读取一个元组时&#xff0c;需要获取表的基本信息&#xff08;例如&#xff1a;表的oid、索引信息和统计信息等&#xff09;及元组的模式信息&#xff0c;这些信息被分别记录在多个系统表中。通常一个表的模式信…

2023年数维杯国际大学生数学建模挑战赛A题

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2022年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题。 cs数模团队在数维杯前为大家提供了许多资料的内容呀&#xff0…

ios 对话框UIAlertController放 tableview

//强弱引用 #define kWeakSelf(type)__weak typeof(type)weak##type type; -(void) showUIAlertTable {kWeakSelf(self)UIAlertController *alert [UIAlertController alertControllerWithTitle:NSLocalizedString("select_stu", nil) message:nil prefer…

基于php+thinkphp的网上书店购物商城系统

运行环境 开发语言&#xff1a;PHP 数据库:MYSQL数据库 应用服务:apache服务器 使用框架:ThinkPHPvue 开发工具:VScode/Dreamweaver/PhpStorm等均可 项目简介 系统主要分为管理员和用户二部分&#xff0c;管理员主要功能包括&#xff1a;首页、个人中心、用户管理、图书分类…

P6入门:项目初始化5-项目支出计划Spending Plan

前言 使用项目详细信息查看和编辑有关所选项目的详细信息&#xff0c;在项目创建完成后&#xff0c;初始化项目是一项非常重要的工作&#xff0c;涉及需要设置的内容包括项目名&#xff0c;ID,责任人&#xff0c;日历&#xff0c;预算&#xff0c;资金&#xff0c;分类码等等&…

Spark3.0中的AOE、DPP和Hint增强

1 Spark3.0 AQE Spark 在 3.0 版本推出了 AQE&#xff08;Adaptive Query Execution&#xff09;&#xff0c;即自适应查询执行。AQE 是 Spark SQL 的一种动态优化机制&#xff0c;在运行时&#xff0c;每当 Shuffle Map 阶段执行完毕&#xff0c;AQE 都会结合这个阶段的统计信…

什么是状态机?

什么是状态机&#xff1f; 定义 我们先来给出状态机的基本定义。一句话&#xff1a; 状态机是有限状态自动机的简称&#xff0c;是现实事物运行规则抽象而成的一个数学模型。 先来解释什么是“状态”&#xff08; State &#xff09;。现实事物是有不同状态的&#xff0c;例…