flink watermark 生成机制与总结

flink watermark 生成机制与总结

    • watermark 介绍
    • watermark生成方式
      • watermark 的生成值算法策略
      • watermark策略设置代码
    • watermark源码分析
      • watermark源码调用流程debug(重要)
      • 测试思路
    • 迟到时间处理
    • FlinkSql 中的watermark
    • 引出问题与源码分析

watermark 介绍

本质上watermark是flink为了处理eventTime窗口计算提出的一种机制,本质上也是一种时间戳,由flink souce或者自定义的watermark生成器按照需求定期或者按条件生成一种系统event,与普通数据流event一样流转到对应的下游operations,接收到watermark数据的operator以此不断调整自己管理的window event time clock。

首先,eventTime计算意味着flink必须有一个地方用于抽取每条消息中自带的时间戳,所以TimestampAssigner的实现类都要具体实现
long (T element, long previousElementTimestamp);方法用来抽取当前元素的eventTime,这个eventTime会用来决定元素落到下游的哪个或者哪几个window中进行计算。

其次,在数据进入window前,需要有一个Watermarker生成当前的event time对应的水位线,flink支持两种后置的Watermarker:Periodic和Punctuated,一种是定期产生watermark(即使没有消息产生),一种是在满足特定情况的前提下触发。两种Watermark分别需要实现接口为
Watermark getCurrentWatermark()和Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);

如果一个下游算子实例消费者多个上游算子实例,则选择上游最小的watermark作为自己的watermark发往下游,这也是为什么要对齐在这里插入图片描述
另外
在这里插入图片描述

watermark生成方式

在 Flink 中,数据处理中需要通过调⽤ DataStream 中的 assignTimestampsAndWatermarks ⽅法来分配时间和⽔印,该⽅法可以传⼊两种参数,⼀个是 AssignerWithPeriodicWatermarks(周期性生成watermark),另⼀个是 AssignerWithPunctuatedWatermarks(已过期,按指定标记性事件生成 watermark),通常建议在数据源(source)之后就进⾏⽣成⽔印,或者做些简单操作⽐如 filter/map/flatMap 之 后再⽣成⽔印,越早⽣成⽔印的效果会更好,也可以直接在数据源头就做⽣成⽔印。

  • With Periodic Watermarks(常用):周期性(一定时间间隔或者达到一定的记录条数)生成watermark

    • 需要实现AssignerWithPeriodicWatermarks接口
    • 默认周期是200ms,可通过env.getConfig.setAutoWatermarkInterval进行修改
    • 实际生产环境用得多,但必须结合时间或者累计条数两个维度,否则在极端情况下会有很大的延时
  • With Punctuated Watermarks(不常用):在满足自定义条件时生成watermark,每一个元素都有机会判断是否生成一个watermark。

    • 需要实现AssignerWithPunctuatedWatermarks接口
    • 在TPS很高的生产环境下会产生大量的 Watermark,可能在一定程度上对下游算子造成一定的压力,只有在实时性很高的场景才会选择这种方式来进行生成水印
    • 新版 Flink 源码中已经标记为 @Deprecated

watermark 的生成值算法策略

  • 紧跟最大事件时间的 watermark 生成策略(完全不容忍乱序)
    WatermarkStrategy.forMonotonousTimestamps();
  • 允许乱序的 watermark 生成策略(最大事件时间-容错时间)
    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)); // 根据实际数据的最大乱序情况来设置
  • 自定义 watermark 生成策略
    WatermarkStrategy.forGenerator(new WatermarkGenerator(){ … } );

watermark策略设置代码


import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;public class sinkFunction {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stream = env.socketTextStream("local", 9999);stream.assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks()); //禁用时间时间的推进机制stream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); //紧跟最大时间时间stream.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator()); //自定义watermark生成算法stream.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String s, long l) {return Long.parseLong(s.split(",")[0]);}}));}
}

watermark源码分析

背景代码: source.map(s->bean).assignWatermarkAndTimestamps( ).process().print();

watermark源码调用流程debug(重要)

想要知道代码是如何调用的,我们通过debug的方式来查看数据调用:
我们测试的代码是这样的:

package Launcher;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;
//nc -lk 9999
public class sinkFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);
//        stream.assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks()); //禁用时间时间的推进机制
//        stream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()); //紧跟最大时间时间
//        stream.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator()); //自定义watermark生成算法stream.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String s, long l) {return Long.parseLong(s.split(",")[0]);}}));stream.print();env.execute();}
}

代码很简单,什么都没有,就一个设置watermark
我们先打了两个断点在源码的BoundedOutOfOrdernessWatermarks.class中:
在这里插入图片描述

然后开启nc窗口:
在这里插入图片描述

当我们debug后,没有任何输入就会在output.emitWatermark(new Watermark(this.maxTimestamp - this.outOfOrdernessMillis - 1L));处 断点停止,因为这个函数是有定时器触发的,只要程序跑起来,每200ms就会触发一次:
在这里插入图片描述

这个两百是怎么来的呢?点这里可以看到:
在这里插入图片描述
这是task类封装的,一个底层算子类TimestampsAndWatermarksOperator,看这个类的open方法中的这里:
在这里插入图片描述
这里注册了一个计时器,时间是当前时间加watermarkInterval 后触发,这个watermarkInterval 就是200ms。

在这里插入图片描述而触发方法,这里在调用了watermarkGenerator.onPeriodicEmit,并且有重新注册了一个200ms后的定时,实现了 每隔200ms触发一次的效果。

如果想修改源码,看一些效果,可以完全按照源码的包名,类名,写一个一模一样的java类,类加载器会优先加载自己写的代码,而不会去加载引入的源码中代码。

在这里插入图片描述
这里可以看到,数据来了,他是先数据 collect,后触发水印的onevent方法,也就是后更新watermark值。

测试思路

构造两条单并行度流,合并成一个单并行度流,来 watermark,及重点观察“接收多个上游分区”的 算子的 watermark 推进规律;
在这里插入图片描述

/*** 作者:深海 "deep as the sea"* 日期:2022/4/10* 联系方式:qq:657270652 wx:doitedu2018* 网站:多易教育 www.51doit.cn* 描述:watermark 推进测试观察**/
public class WatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 构造一个无乱序延迟的 watermark 生成策略WatermarkStrategy<String> stringWatermarkStrategy = WatermarkStrategy.<String>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {return Long.parseLong(element.split(",")[1]);}});// 构造单并行度流 1DataStreamSource<String> source1 = env.socketTextStream("localhost", 9988);SingleOutputStreamOperator<String> s1 = source1.assignTimestampsAndWatermarks(stringWatermarkStrategy);// 构造单并行度流 2DataStreamSource<String> source2 = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<String> s2 = source2.assignTimestampsAndWatermarks(stringWatermarkStrategy);// 两条单并行度流,合并到一条单并行度流DataStream<String> s = s1.union(s2);// 打印 watermark 信息,观察 watermark 推进情况s.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String value, Context ctx, Collector<String> out) throws Exception {// 获取当前的 watermark(注意:此处所谓当前 watermark 是指处理当前数据前的 watermark)long currentWatermark = ctx.timerService().currentWatermark();Long timestamp = ctx.timestamp();System.out.println("s" + " : " + timestamp + " : " + currentWatermark + " => " + value);out.collect(value);}}).print();env.execute();}
}

测试结果

在 9988 端口输入数据: a,1000
在 9999 端口输入数据: b,1000 b,4000 b,5000
输出结果:
s: 1000 : -9223372036854775808 => a,1000
s: 1000 : -9223372036854775808 => b,1000
s: 4000 : 999 => b,4000
s: 5000 : 999 => b,5000

迟到时间处理

  • 直接丢弃:将迟到事件视为错误消息并丢弃(flink默认处理方式)。
  • Side Output机制:可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。
  • Allowed Lateness机制:允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

FlinkSql 中的watermark

在创建表的 DDL 中定义
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 Watermark 生成表达式,同时标记这个已有字段为时间属性字段。

CREATE TABLE user_actions (
user_name STRING,data STRING,
user_action_time TIMESTAMP(3),-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

如果源中的时间戳数据表示为一个 epoch time,通常是一个长值,例如 1618989564564,建议将事件时间属性定义为 TIMESTAMP_LTZ 列

CREATE TABLE user_actions (
user_name STRING,data STRING,
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

引出问题与源码分析

我们这里来引出一个问题,不能排除有同学就是想用ProcessTime。

那么问题来了,EventTime 情况的watermark 很好理解,
可是ProcessTime的watermark到底做了什么,也不需要用它来过滤数据,本来就没有用数据内的时间,根本就不知道数据的顺序,更谈不上乱序了,那ProcessTime起了什么用呢?

我们首先来看下env.setStreamTimeCharacteristic() 这个方法

/*** Sets the time characteristic for all streams create from this environment, e.g., processing* time, event time, or ingestion time.** <p>If you set the characteristic to IngestionTime of EventTime this will set a default* watermark update interval of 200 ms. If this is not applicable for your application* you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.** @param characteristic The time characteristic.*/@PublicEvolvingpublic void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {this.timeCharacteristic = Preconditions.checkNotNull(characteristic);if (characteristic == TimeCharacteristic.ProcessingTime) {getConfig().setAutoWatermarkInterval(0);} else {getConfig().setAutoWatermarkInterval(200);}}

这个方法设置用户使用的是eventtime还是processtime

由源码可以看到 ,如果设置的是ProcessingTime ,会把autoWatermarkInterval这个属性值设为0,如果是EventTime,会设置为 200,我们追踪这个值发现,用户自定义的watermark类,需要注册在assignTimestampsAndWatermarks中,而在assignTimestampsAndWatermarks类中能够找到TimestampsAndPeriodicWatermarksOperator,
TimestampsAndPeriodicWatermarksOperator的open方法中有autoWatermarkInterval这个属性值。

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {// match parallelism to input, otherwise dop=1 sources could lead to some strange// behaviour: the watermark will creep along very slowly because the elements// from the source go to each extraction operator round robin.final int inputParallelism = getTransformation().getParallelism();final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);TimestampsAndPeriodicWatermarksOperator<T> operator =new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator).setParallelism(inputParallelism);}
@Overridepublic void open() throws Exception {super.open();currentWatermark = Long.MIN_VALUE;watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();if (watermarkInterval > 0) {long now = getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval, this);}}

我们来看这个open的初始方法,这个open是TimestampsAndPeriodicWatermarksOperator的初始方法,其实也是assignTimestampsAndWatermarks启动的条件,这个open给定了watermark的初始值。

这里初始化了两个值,

  • 一个是其实的watermark的初始值,最小的long值,-9223372036854775808
  • 另一个是初始的 watermark的间隔 如果是 EventTime就是当前时间加200ms,如果是ProcessTime就是当前时间。

再来看下面的定时任务

	@Overridepublic void onProcessingTime(long timestamp) throws Exception {// register next timerWatermark newWatermark = userFunction.getCurrentWatermark();if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {currentWatermark = newWatermark.getTimestamp();// emit watermarkoutput.emitWatermark(newWatermark);}long now = getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now + watermarkInterval, this);}

再来看下我们自定义注册的watermark方法

     @Overridepublic Watermark getCurrentWatermark() {// return the watermark as current highest timestamp minus the out-of-orderness boundreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}

autoWatermarkInterval为0的话 super.open() 不会被调用

这里面now,就是System.currentTimeMillis(); 所以如果时间间隔不为0,那么下一次调用的时间就是 当前时间 + 方法运行的时间 + 时间间隔,由于方法运行的时间约等于0ms,所以基本就是每个时间间隔(默认200ms),运行一次获取wakermark的方法。

所以如果是ProcessingTime,那么默认时间间隔是0,所以matermarks时间就是一直-9223372036854775808,所以就一直不会过滤时间。
所以想要启动ProcessingTime 来做 时间戳 ,就一定要设置
env.getConfig().setAutoWatermarkInterval(200);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/488594.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++初阶:容器适配器priority_queue常用接口详解及模拟实现、仿函数介绍

介绍完了stack和queue的介绍以及模拟的相关内容后&#xff1a;C初阶&#xff1a;容器适配器介绍、stack和queue常用接口详解及模拟实现 接下来进行priority_queue的介绍以及模拟&#xff1a; 文章目录 1.priority_queue的介绍和使用1.1priority_queue的初步介绍1.2priority_que…

计算机毕业设计 | SSM 学生信息管理 教务管理系统(附源码)

1&#xff0c;绪论 随着我国高等教育的发展&#xff0c;数字化校园将成为一种必然的趋势&#xff0c;国内高校迫切需要提高教育工作的质量与效率&#xff0c;学生成绩管理工作是高校信息管理工作的重要组成部分&#xff0c;与国外高校不同&#xff0c;他们一般具有较大规模的稳…

【Docker】初学者 Docker 基础操作指南:从拉取镜像到运行、停止、删除容器

在现代软件开发和部署中&#xff0c;容器化技术已经成为一种常见的方式&#xff0c;它能够提供一种轻量级、可移植和可扩展的应用程序打包和部署解决方案。Docker 是目前最流行的容器化平台之一&#xff0c;它提供了一整套工具和技术&#xff0c;使得容器的创建、运行和管理变得…

Nginx知识笔记

一、前言 首先&#xff0c;我们来看一张关于正向代理和反向代理的图片 简单理解正向代理和反向代理的概念&#xff1a; 正向代理&#xff1a;在客户端配置代理服务器(和跳板机功能类似&#xff0c;比如公司很多机器需要通过跳板机才允许登录&#xff0c;正向代理的典型用途是…

ChatGPT/GPT4科研应用与AI绘图及论文写作

2023年随着OpenAI开发者大会的召开&#xff0c;最重磅更新当属GPTs&#xff0c;多模态API&#xff0c;未来自定义专属的GPT。微软创始人比尔盖茨称ChatGPT的出现有着重大历史意义&#xff0c;不亚于互联网和个人电脑的问世。360创始人周鸿祎认为未来各行各业如果不能搭上这班车…

AMRT3D数字孪生引擎详解

AMRT 3D数字孪生引擎介绍 AMRT3D引擎是一款融合了眸瑞科技的AMRT格式与轻量化处理技术为基础&#xff0c;以降本增效为目标&#xff0c;支持多端发布的一站式纯国产自研的CS架构项目开发引擎。 引擎包括场景搭建、UI拼搭、零代码交互事件、光影特效组件、GIS/BIM组件、实时数据…

信息矩阵、hessian矩阵与协方差矩阵

文章目录 协方差矩阵联合概率密度hessian矩阵marginalize 本节探讨信息矩阵、hessian矩阵与协方差矩阵的关系&#xff0c;阐明边缘化的原理。 一个简单的示例&#xff0c;如下&#xff1a; 来自 David Mackay. “The humble Gaussian distribution”. In: (2006). 以及手写vio第…

新手想要做好抖音小店,在开店前你需要知道这五点注意事项!

大家好&#xff0c;我是电商小布。 开抖店你说难吗&#xff0c;其实也不难&#xff0c;把需要的材料准备好就可以着手开店。 难的呢&#xff0c;是在小店的运营上边。 所以新手开店想要少出错&#xff0c;少踩坑&#xff0c;一定要提前把店铺的相关注意事项搞清楚。 今天&a…

bisect_left 和 bisect_right 的源码实现及区别解析

哈喽大家好&#xff0c;我是chowley&#xff0c;最近再练二分查找的题&#xff0c;也顺便看了看Python官方的bisect库&#xff0c;这次做一个总结博客。 在 Python 中&#xff0c;bisect_left 和 bisect_right 是两个常用的二分查找函数&#xff0c;用于在已排序的序列中查找元…

鸿蒙LiteOS-M 内核初始化

目录 一、LiteOS-M 初始化内核二、LOS_KernelInit代码分析三、LOS_Start代码解析坚持就有收获 一、LiteOS-M 初始化内核 在LiteOS-M应用程序中&#xff0c;系统初始化如下&#xff1a; /*** brief This is the ohos entry, and you could call this in your main funciton af…

Linux--自定义shell

shell shell就是操作系统提供给用户与操作系统进行交互的命令行界面。它可以理解为一个用户与操作系统之间的接口&#xff0c;用户可以通过输入命令来执行各种操作&#xff0c;如文件管理、进程控制、软件安装等。Shell还可以通过脚本编程实现自动化任务。 常见的Unix系统中使…

【群智能算法转让】一种新的群智能算法||一种超越隐喻的元启发算法||一种基于数学的群智能算法

1、简介 本次全新出推出一个新的基于种群的元启发算法&#xff0c;基于数学中求解非线性方程组的基本思想而开发的&#xff0c;性能不错 新算法转让Q1 top级&#xff08;一种基于数学的超越隐喻的元启发式算法&#xff09; 新的群智能算法转让&#xff0c;新的元启发式算法转…