flink 最后一个窗口一直没有新数据,窗口不关闭问题

flink 最后一个窗口一直没有新数据,窗口不关闭问题

  • 自定义实现 WatermarkStrategy接口

自定义实现 WatermarkStrategy接口

窗口类型:滚动窗口
代码:

    public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{private Tuple2<Long,Boolean> state = Tuple2.of(0L,true);@Overridepublic WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<JSONObject>() {private long maxWatermark;@Overridepublic void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) {maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts"));state.f0 = System.currentTimeMillis();System.out.println("maxWatermark is " + maxWatermark);state.f1 = false;}@Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {//乱序时间long outOfTime = 3000L;if (maxWatermark - outOfTime <=0){} else {// 10s内没有数据则关闭当前窗口System.out.println("System.currentTimeMillis() - state.f0:" + (System.currentTimeMillis() - state.f0));System.out.println("state.f1:" + state.f1);if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){watermarkOutput.emitWatermark(new Watermark(maxWatermark  + 6000L));state.f1 = true;System.out.println("触发窗口,maxWatermark  + 6000L:" + (maxWatermark  + 6000L));} else {System.out.println("正常发送水印");watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime));}}}};}}

代码部分逻辑说明
在这里插入图片描述若设置了自动生成watermark 参数,根据打印日志,设置对应的时间(多久没新数据写入,触发窗口计算)
env.getConfig().setAutoWatermarkInterval(5000);

使用自定义的watermark:
在这里插入图片描述
watermark 周期生成()的疑问:
1、默认200ms,会连续生成4次后,不会继续生成了
2、设置了周期生成间隔,env.getConfig().setAutoWatermarkInterval(1000L); 只会周期生成一次

参考:https://blog.csdn.net/lr131425/article/details/127422833

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/412858.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【论文简介】个性化真实人像生成方法(2024.01.15发布,即将开源)

零样本身份保留生成方法&#xff1a;声称效果好于PhotoMaker&#xff08;即将开源&#xff09; 2401.InstantID: Zero-shot Identity-Preserving Generation in Seconds &#xff1a; 项目主页&#xff1a;https://instantid.github.io/ 一、简介 本文的主要内容是介绍了一种…

无心剑七绝《腊八粥香》

七绝腊八粥香 欣逢腊八粥浓香 五谷丰登聚宝庄 祈福心诚情不尽 佳肴共品待春芳 2024年1月18日 平水韵七阳平韵 这首七言绝句《腊八粥香》以腊八节为背景&#xff0c;描绘了人们欢庆腊八、祈福迎新的情景。 首句“欣逢腊八粥浓香”&#xff0c;开门见山地点明了主题——腊八节&a…

连接超时的问题

连接超时的问题 通用第三方工具连接超时 connect timeout 方案一&#xff1a; /etc/ssh/sshd_config node1上操作&#xff0c;图是错的 方案二&#xff1a; windows上Hosts文件域名解析有问题 比如&#xff1a; 192.168.xx.100 node1 192.168.xx.161 node1 两个都解析成node…

Flink-SQL——时态表(Temporal Table)

时态表(Temporal Table) 文章目录 时态表(Temporal Table)数据库时态表的实现逻辑时态表的实现原理时态表的查询实现时态表的意义 Flink中的时态表设计初衷产品价格的例子——时态表汇率的例子——普通表 声明版本表声明版本视图声明普通表 一个完整的例子测试数据代码实现测试…

FFmpeg解决视频播放加载卡顿问题(FFmpeg+M3U8分片)

FFmpeg解决视频播放加载卡顿问题(FFmpegM3U8分片) 在这静谧的时光里&#xff0c;我们能够更清晰地审视自己&#xff0c;思考未来的方向。每一步的坚实&#xff0c;都是对勇气的拥抱&#xff0c;每一个夜晚的努力&#xff0c;都是对未来的信仰。不要害怕独行&#xff0c;因为正是…

EDA-数据探索-pandas自带可视化-iris

# 加载yellowbrick数据集 import os import pandas as pd FIXTURES os.path.join(os.getcwd(), "data") df pd.read_csv(os.path.join(FIXTURES,"iris.csv")) df.head()sepal_lengthsepal_widthpetal_lengthpetal_widthspecies05.13.51.40.2setosa14.93…

行列转化【附加面试题】

在MySQL中&#xff0c;行列转换是一种常见的操作。它包括行转列和列转行两种情况。 行转列&#xff1a;行转列是将表中的某些行转换成列&#xff0c;以提供更为清晰、易读的数据视图。例如&#xff0c;假设我们有一个包含科目和分数的表&#xff0c;我们可以使用SUM和CASE语句…

银河麒麟操作系统 v10 中离线安装 Docker

银河麒麟操作系统 v10 中离线安装 Docker 1. 查看系统版本2. 查看 Linux 内核版本&#xff08;3.10以上&#xff09;3. 查看 iptabls 版本&#xff08;1.4以上&#xff09;4. 判断处理器架构5. 离线下载 Docker 安装包6. 移动解压出来的二进制文件到 /usr/bin 目录中7. 配置 Do…

基于TCP的全双工网络编程实践

首先我们先了解一下什么是全双工通信&#xff1f; 全双工数据通信允许数据同时在两个方向上传输&#xff0c;因此&#xff0c;全双工通信相当于是两个单工通信方式的结合&#xff0c;它要求发送设备和接收设备都有独立的接收和发送能力。 TCP服务端代码&#xff1a; #includ…

【控制篇 / 分流】(7.4) ❀ 01. 对指定IP网段访问进行分流 ❀ FortiGate 防火墙

【简介】公司有两条宽带&#xff0c;一条ADSL拨号用来上网&#xff0c;一条移动SDWAN&#xff0c;已经连通总部内网服务器&#xff0c;领导要求&#xff0c;只有访问公司服务器IP时走移动SDWAN&#xff0c;其它访问都走ADSL拨号&#xff0c;如果你是管理员&#xff0c;你知道有…

系统性学习vue-vuex

系统性学习vue-vuex 理解vuexvuex工作原理搭建vuex环境案例Vuex的开发者工具使用getters配置项mapState与mapGettersmapActions和mapMutationsvuex模块化namespace 理解vuex 概念&#xff1a; 专门在Vue中实现集中式状态&#xff08;数据&#xff09;管理的一个Vue插件&#xf…

使用Sqoop的并行处理:扩展数据传输

使用Sqoop的并行处理是在大数据环境中高效传输数据的关键。它可以显著减少数据传输的时间&#xff0c;并充分利用集群资源。本文将深入探讨Sqoop的并行处理能力&#xff0c;提供详细的示例代码&#xff0c;以帮助大家更全面地了解和应用这一技术。 Sqoop的并行处理 在开始介绍…