flink学习之水位线

什么是水位线

在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,
用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数
据的时间戳来驱动的。
我们可以把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟
的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标
记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以
更新自己的时钟了。在 Flink 中,数据流中用来做时间标记的记号就叫做水位线。
水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,
主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个
数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

水位线的分类

有序的水位线

在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;而在实际应用中,
如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间
戳、插入水位线就做了大量的无用功。所以为了提高效率,一般会每隔一段时间生成一个水位
线,这个水位线的时间戳,就是当前最新数据的时间戳,所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。
在这里插入图片描述

无序的水位线

在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改
变,这就是所谓的“乱序数据”。
在这里插入图片描述
对于连续数据流,我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就
不再生成新的水位线,也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。
在这里插入图片描述

如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需
要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新
的水位线,。所以我们可以试着多等几秒,也就是把时钟调得更慢一些。最终的目的,就是要让窗口能够把所有迟到数据都收进来,得到正确的计算结果。对应到水位线上,其实就是要保证,当前时间已经进展到了这个时间戳,在这之后不可能再有迟到数据来了(延迟设的足够长)。
在这里插入图片描述

如何生成水位线

1.水位线的生成时机

水位线生产的最佳位置是在尽可能靠近数据源的地方,因为水位线生成时会做出一些有关元素顺序相对时间戳的假设。由于数据源读取过程是并行的,一切引起Flink跨行数据流分区进行重新分发的操作(比如:改变并行度,keyby等)都会导致元素时间戳乱序。但是如果是某些初始化的filter、map等不会引起元素重新分发的操作,可以考虑在生成水位线之前使用。

2.水位线生成策略

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方 法:assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指
示事件时间。

val stream: DataStream[ClickEvent] = env.addSource(new ClickSource())  
val withTimestampsAndWatermarks: DataStream[ClickEvent] = stream.assignTimestampsAndWatermarks(watermarkStrategy)

assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就是
所谓的“水位线生成策略”。WatermarkStrategy 中包含了一个“时间戳分配器”TimestampAssigner
和一个“水位线生成器”WatermarkGenerator。

trait WatermarkStrategy[T] extends TimestampAssignerSupplier[T] with WatermarkGeneratorSupplier[T] {  def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[T]  def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[T]  
}

TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给
元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在
WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,
以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间
为处理时间,可以调用环境配置的 setAutoWatermarkInterval()方法来设置,默认为
200ms。

env.getConfig.setAutoWatermarkInterval(60 * 1000L)
3. flink内置水位线生成器
  1. 有序流
val stream: DataStream[Event] = env.addSource(new ClickSource())  
val withTimestampsAndWatermarks: DataStream[Event] = stream.assignTimestampsAndWatermarks(  WatermarkStrategy  .forMonotonousTimestamps[Event]()  .withTimestampAssigner { (event, timestamp) => event.timestamp }  
)
  1. 无序流
import java.time.Duration  
import org.apache.flink.streaming.api.scala._  
import org.apache.flink.streaming.api.windowing.time.Time  
import org.apache.flink.util.Collector  object OutOfOrdernessTest {  def main(args: Array[String]): Unit = {  val env = StreamExecutionEnvironment.getExecutionEnvironment  val clickSource = new ClickSource()  val stream = env.addSource(clickSource)  // 插入水位线的逻辑  val watermarkedStream = stream  .assignTimestampsAndWatermarks(  WatermarkStrategy  .forBoundedOutOfOrderness(Time.seconds(5))  .withTimestampAssigner(new SerializableTimestampAssigner[Event] {  override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp  })  )  watermarkedStream.print()  env.execute("OutOfOrdernessTest")  }  
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/415178.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL 基于创建时间进行RANGE分区

MySQL是一款广泛使用的关系型数据库。在MySQL中,大量数据场景提高查询效率是非常关键的,所以,对数据表进行分区是一个很好的选择。 在创建分区表之前,需要了解一下MySQL分区的基本概念。MySQL分区可以将一个大表分成多个小表&…

Skydel 23.8新版本发布!GNSS模拟器完成首项实地路测项目

奥本大学自动驾驶团队运用GNSS模拟器完成首项实地路测项目 奥本大学与最近与阿拉巴马州伯明翰的巴伯赛车运动公园合作进行道路测试,该车在没有任何人工干预的情况下成功绕赛道完成了一圈,这也是印地自动驾驶挑战赛中车辆首次在美国专业赛道上完成一圈。…

Docker(二)安装指南:主要介绍在 Linux 、Windows 10 和 macOS 上的安装

作者主页: 正函数的个人主页 文章收录专栏: Docker 欢迎大家点赞 👍 收藏 ⭐ 加关注哦! 安装 Docker Docker 分为 stable test 和 nightly 三个更新频道。 官方网站上有各种环境下的 安装指南,这里主要介绍 Docker 在…

Java项目:11 Springboot的垃圾回收管理系统

作者主页:舒克日记 简介:Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 功能介绍 本系统通过利用系统的垃圾回收流程,提高垃圾回收效率,通过垃圾回收的申请,增删改查,垃圾运输申…

docker部署项目,/var/lib/docker/overlay2目录满了如何清理?

docker部署项目,/var/lib/docker/overlay2目录满了如何清理? 一、问题二、解决1、查看 /var/lib/docker 目录(1)、containers 目录(2)、volumes 目录(3)、overlay2 目录 2、清理&…

橘子学K8S04之重新认识Docker容器

我们之前分别从 Linux Namespace 的隔离能力、Linux Cgroups 的限制能力,以及基于 rootfs 的文件系统三个角度来理解了一下关于容器的核心实现原理。 这里一定注意说的是Linux环境,因为Linux Docker (namespaces cgroups rootfs) ! Docker on Mac (bas…

管理文件名称技巧:如何用最后修改时间命名文件名的方法

当我们需要管理大量的文件时,一个好的文件命名策略是非常重要的。一个清晰、有组织的文件名可以让我们更容易地找到和组织文件,从而提高工作效率。其中一种有效的文件命名技巧是使用最后修改时间来命名文件。下面将介绍云炫文件管理器如何使用这种方法。…

【编码魔法师系列_构建型4】原型模式(Prototype Pattern)

👉直达编码魔法师系列其他文章👈 学会设计模式,你就可以像拥有魔法一样,在开发过程中解决一些复杂的问题。设计模式是由经验丰富的开发者们(GoF)凝聚出来的最佳实践,可以提高代码的可读性、可维…

[一]ffmpeg音视频解码

[一]ffmpeg音视频解码 一.编译ffmpeg1.安装vmware虚拟机2.vmware虚拟机安装linux操作系统3.安装ftp和fshell软件4.在Ubuntu(Linux)中编译Android平台的FFmpeg( arm和x86 )5.解压FFmpeg6.Android编译脚本(1)…

电子签名实名认证的必要性解析

电子签名是确保电子文件真实性和完整性的重要手段,而实名认证则是保证电子签名有效性的必要条件。在电子签名过程中,实名认证的作用主要体现在以下几个方面: 确认身份:实名认证能够确认签署者的真实身份,防止冒签、代…

【Effective Objective - C】—— 对象,消息,运行期

【Effective Objective - C】—— 对象,消息,运行期 理解“属性”这一概念定义变量dynamic关键字属性特质原子性读/写权限内存管理语义方法名要点 在对象内部尽量直接访问实例变量直接访问与属性访问的区别:惰性初始化要点 理解“对象等同性”…

CVer从0入门NLP(二)———LSTM、ELMO、Transformer模型

🍊作者简介:秃头小苏,致力于用最通俗的语言描述问题 🍊专栏推荐:深度学习网络原理与实战 🍊近期目标:写好专栏的每一篇文章 🍊支持小苏:点赞👍🏼、…