7.2、如何理解Flink中的水位线(Watermark)

目录

0、版本说明

1、什么是水位线?

2、水位线使用场景?

3、设计水位线主要为了解决什么问题?

4、怎样在flink中生成水位线?

4.1、自定义标记 Watermark 生成器

4.2、自定义周期性 Watermark 生成器

4.3、内置Watermark生成器 - 有序流水位线生成器

4.4、内置Watermark生成器 - 乱序流水位线生成器

4.5、在 读取数据源时 添加水位线

5、水位线和窗口的关系?

6、水位线在各个算子间的传递

6.1、测试用例 - 不设置 withIdleness 超时时间

6.2、测试用例 - 设置 withIdleness 超时时间


0、版本说明

        开发语言:java1.8

        Flink版本:1.17

        官网链接:官网链接

1、什么是水位线?

        Flink中水位线是一条特殊的数据(long timestamp)

        它会以时间戳的形式作为一条标识数据插入到数据流中


2、水位线使用场景?

        使用事件时间(EventTime)做流式计算任务时,需要根据事件时间生成水位线(Watermark)

        通过水位线来触发窗口计算,水位线作为衡量事件时间(EventTime)进展的标识


3、设计水位线主要为了解决什么问题?

        设计水位线主要是为了解决实时流中数据乱序和迟到的问题

        思考:什么原因造成了数据流的乱序呢?

                如今数据采集、数据传输大多都在分布式系统中完成

                各个机器节点因为网络和自身性能的原因 导致了数据的乱序和迟到


4、怎样在flink中生成水位线?

        Flink中支持在 数据源和普通DataStream上添加水位线生成策略(WatermarkStrategy)

4.1、自定义标记 Watermark 生成器

标记 Watermark 生成器特点:

        每条数据到来后,都会为其生成一条 Watermark

适用场景:

        数据量小且数据有序

代码示例:        

Step1:自定义 标记水位线生成器 实现类

// 自定义 标记水位线生成器 实现类
public class PeriodWatermarkGenerator<T> implements WatermarkGenerator<T> {// 每进入一条数据,都会调用一次 onEvent 方法@Override/** 参数说明:*   @event : 进入到该方法的事件数据*   @eventTimestamp : 时间戳提取器提取的时间戳* */public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {//发射水位线output.emitWatermark(new Watermark(eventTimestamp));}// 不需要实现@Overridepublic void onPeriodicEmit(WatermarkOutput output) {}
}

Step2:自定义 标记性水位线生成策略 实现类

// TODO 自定义 标记性水位线生成策略
public class PeriodWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {// TODO 实例化一个 事件时间提取器@Overridepublic TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {return element.f1;}};return timestampAssigner;}// TODO 实例化一个 watermark 生成器@Overridepublic WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new PeriodWatermarkGenerator<>();}
}

Step3:使用 标记性水位线生成策略

// TODO 使用 自定义标记 Watermark 生成器
public class UserPeriodWatermarkStrategy {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.将socket作为数据源(开启socket端口: nc -lk 9999)SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}});// 3.为 DataStream 添加水位线生成策略 (使用 自定义WatermarkStrategy 实现类)SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(new PeriodWatermarkStrategy());// 4.通过 processFunction实例 查看生成的水位线SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());process.print();// 5.触发程序执行env.execute();}
}

查看运行结果:


4.2、自定义周期性 Watermark 生成器

标记 Watermark 生成器特点:

        基于处理时间,周期性生成 Watermark

适用场景:

        数据量大且可能存在一定程度数据延迟(乱序)

代码示例:        

Step1:自定义 周期性水位线生成器 实现类

// 自定义 周期性水位线生成器
public class PunctuatedWatermarkGenerator<T> implements WatermarkGenerator<T> {// 设置变量,用来保存 当前最大的事件时间private long currentMaxTimestamp;// 设置变量,指定最大的乱序时间(等待时间)private final long maxOutOfOrderness = 0000; // 3 秒@Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {// 只更新当前最大时间戳,不再发生水位线if (currentMaxTimestamp < eventTimestamp) currentMaxTimestamp = eventTimestamp;}// 周期性 生成水位线// 每个 setAutoWatermarkInterval 时间,调用一次该方法@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发出的 watermark = 当前最大时间戳 - 最大乱序时间output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));}
}

Setp2:自定义 周期性水位线生成策略 实现类

// 自定义 周期性水位线生成策略
public class PunctuatedWatermarkStrategy implements WatermarkStrategy<Tuple2<String, Long>> {// TODO 实例化一个 事件时间提取器@Overridepublic TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {TimestampAssigner<Tuple2<String, Long>> timestampAssigner = new TimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {return element.f1;}};return timestampAssigner;}// TODO 实例化一个 watermark 生成器@Overridepublic WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new PunctuatedWatermarkGenerator<>();}}

Step3:周期性水位线生成策略

// TODO 使用 自定义周期性 Watermark 生成器
public class UserPunctuatedWatermarkStrategy {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)env.getConfig().setAutoWatermarkInterval(3 * 1000L);// 2.将socket作为数据源(开启socket端口: nc -lk 9999)SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}});// TODO 获取 WatermarkStrategy实例 (方式1:通过 WatermarkStrategy实现类获取)PunctuatedWatermarkStrategy punctuatedWatermarkStrategy = new PunctuatedWatermarkStrategy();// TODO 获取 WatermarkStrategy实例 (方式2:通过 WatermarkStrategy工具类获取) 推荐WatermarkStrategy<Tuple2<String, Long>> punctuatedWatermarkStrategyByUtil = WatermarkStrategy.<Tuple2<String, Long>>forGenerator(context -> new PunctuatedWatermarkGenerator<>()).withTimestampAssigner((event, timestamp) -> event.f1);// 3.使用 自定义水位线策略实例 来提取时间戳&生成水位线SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(punctuatedWatermarkStrategy);// 4.通过 processFunction实例 查看生成的水位线SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());process.print();// 3.触发程序执行env.execute();}
}

查看运行结果:


4.3、内置Watermark生成器 - 有序流水位线生成器

有序流水位线生成器特点:

        基于处理时间,周期性生成 Watermark,最大乱序时间为0

适用场景:

        大数量有序流

代码示例:

// TODO 内置Watermark生成器 - 有序流水位线生成器
public class UserForMonotonousTimestamps {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)env.getConfig().setAutoWatermarkInterval(3 * 1000L);// 2.将socket作为数据源(开启socket端口: nc -lk 9999)SingleOutputStreamOperator<Tuple2<String, Long>> sourceDataStream = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}});// TODO 创建 内置水位线生成策略WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner((element,recordTimestamp) -> element.f1);// 3.使用 内置水位线生成策略SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = sourceDataStream.assignTimestampsAndWatermarks(watermarkStrategy);// 4.通过 processFunction实例 查看生成的水位线SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());process.print();// 3.触发程序执行env.execute();}
}

查看运行结果:


4.4、内置Watermark生成器 - 乱序流水位线生成器

乱序流水位线生成器特点:

        基于处理时间,周期性生成 Watermark,可以这是最大乱序时间

适用场景:

        大数量乱序流

代码示例:

// TODO 内置Watermark生成器 - 乱序流水位线生成器
public class UserForBoundedOutOfOrderness {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// TODO 设置周期性生成水位线的时间间隔(默认为200毫秒)env.getConfig().setAutoWatermarkInterval(3 * 1000L);// 2.将socket作为数据源(开启socket端口: nc -lk 9999)SingleOutputStreamOperator<Tuple2<String, Long>> ds = env.socketTextStream("localhost", 9999).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}});// TODO 获取 WatermarkStrategy实例WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 设置最大乱序时间为1s.withTimestampAssigner((element,recordTimestamp) -> element.f1);// 3.使用 内置水位线生成策略SingleOutputStreamOperator<Tuple2<String, Long>> assignTimestampsAndWatermarksDs = ds.assignTimestampsAndWatermarks(watermarkStrategy);// 4.通过 processFunction实例 查看生成的水位线SingleOutputStreamOperator<String> process = assignTimestampsAndWatermarksDs.process(new ShowProcessFunction());process.print();// 3.触发程序执行env.execute();}
}

查看运行结果:


4.5、在 读取数据源时 添加水位线

// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.创建 Source 对象
Source source = DataGeneratorSource、KafkaSource...// 3.读取 source时添加水位线
env.fromSource(source, WatermarkStrategy实例, "source name")   .print()
;// 4.触发程序执行
env.execute();

5、水位线和窗口的关系?

窗口什么时候创建?

        当窗口内的第一条数据到达时

窗口什么时候触发计算?

        当阈值水位线到达窗口时


6、水位线在各个算子间的传递

        下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值

测试代码:

// TODO 测试水位线的传递
public class TransmitWaterMark {public static void main(String[] args) throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3); // 2.将socket作为数据源(开启socket端口: nc -lk 9999)DataStreamSource<String> source = env.socketTextStream("localhost", 9999);source.partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String key, int numPartitions) {if (key.equals("a")) {return 0;} else if (key.equals("b")) {return 1;} else {return 2;}}}, value -> value.split(",")[0]).map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2 map(String value) throws Exception {return new Tuple2(value.split(",")[0], Long.parseLong(value.split(",")[1]));}}).assignTimestampsAndWatermarks(WatermarkStrategy//.<Tuple2<String, Long>>forMonotonousTimestamps().<Tuple2<String, Long>>forGenerator(new PeriodWatermarkStrategy()).withTimestampAssigner((element,recordTimestamp) -> element.f1).withIdleness(Duration.ofSeconds(5))  //空闲等待5s).process(new ShowProcessFunction()).setParallelism(1).print();env.execute();}
}

6.1、测试用例 - 不设置 withIdleness 超时时间

现象:如果上游某一个子任务一直没有数据更新,下游算子的水位线一直不会变化


6.2、测试用例 - 设置 withIdleness 超时时间

现象:如果上游某一个子任务`在指定时间内`数据更新,下游算子的水位线将不受该子任务最小值的影响

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/115329.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件需求文档、设计文档、开发文档、运维文档大全

在软件开发过程中&#xff0c;文档扮演着至关重要的角色。它不仅记录了项目的需求、设计和开发过程&#xff0c;还为项目的维护和管理提供了便利。本文将详细介绍软件开发文档的重要性和作用&#xff0c;以及需求分析、软件设计、开发过程、运维管理和项目管理等方面的文档要求…

网页采集器-免费的网页采集器

在互联网上&#xff0c;蕴藏着无穷无尽的信息宝藏&#xff0c;无论您是一名学生、研究人员、市场分析师还是企业家&#xff0c;都需要从网络上搜集各种信息来支持您的工作和决策。然而&#xff0c;互联网上的信息千差万别&#xff0c;分散在不同的网站和页面上&#xff0c;如何…

网页游戏如何开发网页游戏类型有哪些?

随着互联网的普及和技术的发展&#xff0c;网页游戏已经成为娱乐和休闲活动的重要组成部分。无需安装任何应用程序&#xff0c;只需打开浏览器&#xff0c;您就可以畅玩各种类型的网页游戏。然而&#xff0c;开发网页游戏并不是一项容易的任务&#xff0c;因为不同类型的游戏需…

电脑桌面透明便签软件是哪个?

在现代快节奏的工作环境中&#xff0c;许多上班族都希望能够在电脑桌面上方便地记录工作资料、重要事项、工作流程等内容。为了解决这个问题&#xff0c;一款优秀的电脑桌面便签软件是必不可少的。在选择桌面便签软件时&#xff0c;许多用户也希望便签软件能够与电脑桌面壁纸相…

2023年中国研究生数学建模竞赛D题解题思路

为了更好的帮助大家第一天选题&#xff0c;这里首先为大家带来D题解题思路&#xff0c;分析对应赛题之后做题阶段可能会遇到的各种难点。 稍后会带来D题的详细解析思路&#xff0c;以及相关的其他版本解题思路 成品论文等资料。 赛题难度评估&#xff1a;A、B>C>E、F&g…

若依cloud -【 100 ~ 】

100 分布式日志介绍 | RuoYi 分布式日志就相当于把日志存储在不同的设备上面。比如若依项目中有ruoyi-modules-file、ruoyi-modules-gen、ruoyi-modules-job、ruoyi-modules-system四个应用&#xff0c;每个应用都部署在单独的一台机器里边&#xff0c;应用对应的日志的也单独存…

软考网络工程师IPSEC VPN配置考点总结

IPSEC VPN&#xff08;华为&#xff09;工作流程 配置安全ACL&#xff1a;配置哪些流量需要被保护配置安全提议&#xff1a;配置IPsec的参数配置IKE&#xff1a;预共享密钥&#xff0c;配置身份验证方法、加密算法等安全参数配置安全策略&#xff1a;1和2做关联在接口应用安全…

计算机网络常见面试题

目录 一、谈一谈对OSI七层模型和TCP/IP四层模型的理解&#xff1f; 答&#xff1a;OSI七层模型主要分为&#xff1a; TCP/IP四层协议&#xff1a; 二、谈谈TCP协议的3次握手过程&#xff1f; 三、TCP协议为什么要3次握手&#xff1f;2次&#xff0c;4次不行吗&#xff1f; …

腾讯面试题:无网络环境,如何部署Docker镜像?

亲爱的小伙伴们&#xff0c;大家好&#xff01;我是小米&#xff0c;很高兴再次和大家见面。今天&#xff0c;我要和大家聊聊一个特别有趣的话题——腾讯面试题&#xff1a;无网络环境&#xff0c;如何部署Docker镜像&#xff1f;这可是一个技术含量颇高的问题哦&#xff01;废…

大型集团借力泛微搭建语言汇率时区统一、业务协同的国际化OA系统

国际化、全球化集团&#xff0c;业务遍布全世界&#xff0c;下属公司众多&#xff0c;集团对管理方式和企业文化塑造有着很高的要求。不少大型集团以数字化方式助力全球统一办公&#xff0c;深化企业统一管理。 面对大型集团全球化的管理诉求&#xff0c;数字化办公系统作为集…

2101. 引爆最多的炸弹;752. 打开转盘锁;1234. 替换子串得到平衡字符串

2101. 引爆最多的炸弹 核心思想&#xff1a;枚举BFS。枚举每个炸弹最多引爆多少个炸弹&#xff0c;对每个炸弹进行dfs&#xff0c;一个炸弹能否引爆另一个炸弹是两个炸弹的圆心距离在第一个炸弹的半径之内。 752. 打开转盘锁 核心思想:典型BFS&#xff0c;就像水源扩散一样&a…

2023工博会强势回归!智微工业携八大系列重磅亮相

中国国际工业博览会&#xff08;简称"中国工博会"&#xff09;自1999年创办以来&#xff0c;历经二十余年发展创新&#xff0c;通过专业化、市场化、国际化、品牌化运作&#xff0c;已发展成为通过国际展览业协会&#xff08;UFI&#xff09;认证、中国工业领域规模最…