【入门Flink】- 09Flink水位线Watermark

窗口的处理过程中,基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。

什么是水位线

用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

有序流中水位线

(1)理想状态(数据量小),数据应该按照生成的先后顺序进入流中,每条数据产生一个水位线

(2)实际应用中,如果当前数据量非常大,且同时涌来的数据时间差会非常小(比如几毫秒),往
往对处理计算也没什么影响。所以为了提高效率,会每隔一段时间生成一个水位线

乱序流中水位线

分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是
所谓的“乱序数据”。

(1)乱序+数据量小:还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。乱序数据,插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

(2)乱序+数据量大:考虑到大量数据同时到来的处理效率,可以周期性地生成水位线。这时
只需要保存一下之前所有数据中的最大时间截,需要插入水位线时,就直接以它作为时间戳生成新的水位线。

(3)乱序+迟到数据:无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据设置迟到时间,比如2秒:也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,此时迟到2秒的数据也会被正确收集处理。【迟到时间不能设置过长,否则会对实时性会有所影响】

水位线的特性

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t,代表t之前的所 有数据都到齐了,之后流中不会出现时间t'≤t的数据

水位线与窗口配合,完成对乱序数据的正确处理

水位线是流处理中对低延迟和结果正确性的一个权衡机制。

水位线生成策略

生成水位线的方法:.assignTimestampsAndWatermarks(),主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。【指定水位线生成策略】

stream.assignTimestampsAndWatermarks(<watermark strategy>);

WatermarkStrategy 水位线策略是一个接口,里面内置一些生成策略:

image-20231111224149038

有序流中内置水位线设置

时间戳单调增长,所以永远不会出现迟到数据的问题。WatermarkStrategy.forMonotonousTimestamps()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy.<WaterSensor>forMonotonousTimestamps()// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});

乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,必须设置一个固定量的延迟时间。WatermarkStrategy. forBoundedOutOfOrderness()

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 乱序数据,等待3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定时间戳分配器,从数据中提取 单位毫秒.withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (element, recordTimestamp) -> {System.out.println(" 数据 =" + element + ",recordTs=" + recordTimestamp);return element.getTs() * 1000L;});

自定义水位线生成器

(1)周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过 onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
模仿该类BoundedOutOfOrdernessWatermarks

public class CustomBoundedOutOfOrdernessGenerator<T> implements WatermarkGenerator<T> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}
}

在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;方法由系统框架周期性地调用,默认 200ms 一次。【不建议修改】

 env.getConfig().setAutoWatermarkInterval(400L);

(2)断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。

如下:只要有数据来就直接发射水位线

    @Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(eventTimestamp, maxTs); // 更新最大时间戳output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}

(3)在数据源中发送水位线

可以在自定义的数据源中抽取事件时间,然后发送水位线。

自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一

env.fromSource(
kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), 
// WatermarkStrategy.noWatermarks() 或者不发送水位线
"kafkasource"
)

水位线的传递(空闲等待withIdleness)

一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的作为当前任务的事件时钟

如下案例:当程序并行度设置为2时,自定义分区器导致一个分区一直拿不到数据(最小时钟一直为null),此时如不加以干预,事件时钟将永远不会推进,存在问题。设置空闲时间,当超过空闲时间一直收不到该分区数据,直接忽略该分区,还是会依旧推进时间时钟

        env.setParallelism(2);// 自定义分区器:数据%分区数,只输入奇数,都只会去往map 的一个子任务SingleOutputStreamOperator<Integer> socketDS = env.socketTextStream("xxxx", 7777).partitionCustom(new MyPartitioner(), r -> r).map(Integer::parseInt).assignTimestampsAndWatermarks(WatermarkStrategy.<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000L).withIdleness(Duration.ofSeconds(5)) // 空闲等待 5s);// 分成两组:奇数一组,偶数一组,开 10s 的事件时间滚动窗口socketDS.keyBy(r -> r % 2).window(TumblingEventTimeWindows.of(Time.seconds(10)))...

迟到数据的处理

1)推迟水印推进(设置延迟时间)

水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

2)设置窗口延迟关闭

Flink 的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。当达到设置延迟关闭时间之后,才会真正关闭窗口,关闭窗口后再迟到的数据就不会再处理。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

3)使用侧流接收迟到的数据

最后兜底,窗口关闭之后的迟到数据,使用侧输出流输出。

完整方案:

public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy// 1.设置迟到时间 3s.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L)// .withIdleness(Duration.ofSeconds(5)); // 空闲等待 5s;SingleOutputStreamOperator<WaterSensor>sensorDSWithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTag<WaterSensor> lateTag = new OutputTag<>("latedata", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = sensorDSWithWatermark.keyBy(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(10))) .allowedLateness(Time.seconds(2)) // 2.推迟2s关窗.sideOutputLateData(lateTag) // 3.关窗后的迟到数据,放入侧输出流.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);}});process.print();// 从主流获取侧输出流,打印process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/169188.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nginx(五)

负载均衡 官网文档 Using nginx as HTTP load balancer nginx中实现反向代理的方式 HTTP&#xff1a;通过nginx配置反向代理到后端服务器&#xff0c;nginx将接收到的HTTP请求转发给后端服务器。使用 proxy_pass 命令 HTTPS&#xff1a;通过nginx配置反向代理到后端服务器&…

【操作系统面试题(32道)与面试Linux命令大全】

文章目录 操作系统面试题引论1.什么是操作系统&#xff1f;2.操作系统主要有哪些功能&#xff1f; 操作系统结构3.什么是内核&#xff1f;4.什么是用户态和内核态&#xff1f;5.用户态和内核态是如何切换的&#xff1f; 进程和线程6.并行和并发有什么区别&#xff1f;7.什么是进…

小白学安全-KunLun-M静态白盒扫描工具

一、KunLun-M简介 KunLun-M是一个完全开源的静态白盒扫描工具&#xff0c;支持PHP、JavaScript的语义扫描&#xff0c;基础安全、组件安全扫描&#xff0c;Chrome Ext\Solidity的基础扫描。开源地址&#xff1a;https://github.com/LoRexxar/Kunlun-M Cobra是一款源代码安全审计…

数据拟合、参数估计、插值等数据处理算法

介绍 数据拟合&#xff1a; 数据拟合是通过选择或构建合适的函数模型&#xff0c;将给定的数据点与该函数模型进行匹配和拟合的过程。常见的数据拟合方法包括最小二乘法和非线性最小二乘法。最小二乘法通过最小化实际数据与拟合函数的残差平方和来求解最优拟合参数。非线性最小…

Python之函数进阶-nonlocal和LEGB

Python之函数进阶-nonlocal和LEGB nonlocal语句 nonlocal:将变量标记为不在本地作用域定义&#xff0c;而是在上级的某一级局部作用域中定义&#xff0c;但不能是全局作用域中定义。 函数的销毁 定义一个函数就是生成一个函数对象&#xff0c;函数名指向的就是函数对象。可…

axios请求的问题

本来不想记录&#xff0c;但是实在没有办法&#xff0c;因为总是会出现post请求&#xff0c;后台接收不到数据的情况,还是记录一下如何的解决的比较好。 但是我使用export const addPsiPurOrder data > request.post(/psi/psiPurOrder/add, data); 下面是封装的代码。后台接…

幼教早教内容付费预约小程序的效果如何

很多家庭对孩子的重视程度很高&#xff0c;尤其加之如今激烈竞争的市场&#xff0c;孩子从小便需要各种提前教育&#xff0c;而相关教培企业也比较多&#xff0c;基于服务高需求度&#xff0c;线下教育与线上课程教育同样重要。 在实际经营中&#xff0c;幼教早教培训机构也面…

“富婆”通讯录——让你少奋斗50年

文章目录 一、项目需求分析二、通讯录各功能实现思路及代码准备工作2.1、打印一个菜单&#xff0c;提供用户选择功能2.2、添加联系人信息2.3、删除联系人信息2.4、查询联系人信息2.5、修改联系人信息2.6、显示所有联系人信息2.7、对所有联系人信息进行排序整理2.8、删除所有联系…

[极客大挑战 2019]BuyFlag 1(两种解法)

题目环境&#xff1a; FLAG NEED YOUR 100000000 MONEY flag需要你的100000000元 F12瞅瞅源代码&#xff1a; if (isset($_POST[password])){ $password $_POST[password]; if (is_numeric($password)) { echo "password cant be number" } elseif ($pas…

【MySQL】列属性

文章目录 CHAR和VARCHAR插入单行 INSERT INTO插入多行插入分层行 LAST_INSERT_IN()创建表复制 CREAT TABLE AS更新单行 UPDATE...SET更新多行在UPDATES中使用子查询【需着重复习】删除行 DELETE恢复数据库到原始状态 CHAR和VARCHAR CHAR(50)&#xff1a;存储文本占5个字符&…

Git 命令详解

系列文章目录 C高性能优化编程系列 深入理解软件架构设计系列 高级C并发线程编程 C技能系列 期待你的关注哦&#xff01;&#xff01;&#xff01; 现在的一切都是为将来的梦想编织翅膀&#xff0c;让梦想在现实中展翅高飞。 Now everything is for the future of dream we…

【博士每天一篇文献-算法】Imposing Connectome-Derived Topology on an Echo State Network

阅读时间&#xff1a;2023-11-5 1 介绍 年份&#xff1a;2022 作者&#xff1a;Jacob Morra, Mark Daley 西部大学 期刊&#xff1a;2022 International Joint Conference on Neural Networks (IJCNN) 引用量&#xff1a;3 研究了果蝇连接图的拓扑结构对混沌时间序列预测中回…