深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析

Flink Window 常见需求背景

需求描述

每隔 5 秒,计算最近 10 秒单词出现的次数 —— 滑动窗口
每隔 5 秒,计算最近 5 秒单词出现的次数 —— 滚动窗口
在这里插入图片描述

关于 Flink time 种类 TimeCharacteristic

在这里插入图片描述

  • ProcessingTime
  • IngestionTime
  • EventTime

WindowAssigner 的子类

  • SlidingProcessingTimeWindows
  • SlidingEventTimeWindows
  • TumblingEventTimeWindows
  • TumblingProcessingTimeWindows

使用 EventTime + WaterMark 处理乱序数据

示意图:
在这里插入图片描述

  • 使用 onPeriodicEmit 方法发送 watermark,默认每 200ms 发一次。
  • 窗口起始时间默认按各个时区的整点时间,支持自定义 offset。

Flink Watermark 机制定义

有序的流的 Watermarks

在这里插入图片描述

无序的流的 Watermarks

在这里插入图片描述

多并行度流的 Watermarks

在这里插入图片描述

深入理解 Flink Watermark

Flink Window 触发的条件:

  1. watermark 时间 >= window_end_time
  2. 在 [window_start_time, window_end_time) 区间中有数据存在(注意是左闭右开的区间),而且是以 event time 来计算的

Flink 处理太过延迟数据

Flink 丢弃延迟太多的数据

企业生产中一般不用。

Flink 指定允许再次迟到的时间

治标不治本,企业生产中一般不用。

Flink 收集迟到的数据单独处理

企业生产中应用较为广泛。

Flink 多并行度 Watermark

一个 window 可能会接受到多个 waterMark,我们以最小的为准。
在这里插入图片描述

Flink Window 概述

官网介绍

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
在这里插入图片描述

Flink Window 分类

Flink 的 window 分为两种类型的 Window,分别是:Keyed Windows 和 Non-Keyed Windows,他们的使用方式不同:

// Keyed Windows 
stream.keyBy(...) <- keyed versus non-keyed windows.window(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag"
// Non-Keyed Windows
stream.windowAll(...) <- required: "assigner"[.trigger(...)] <- optional: "trigger" (else default trigger)[.evictor(...)] <- optional: "evictor" (else no evictor)[.allowedLateness(...)] <- optional: "lateness" (else zero)[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data).reduce/aggregate/apply() <- required: "function"[.getSideOutput(...)] <- optional: "output tag"

Window 的生命周期

  1. 当属于某个窗口的第一个元素到达的时候,就会创建一个窗口。
  2. 当时间(event or processing time)超过 window 的结束时间戳加上用户指定的允许延迟(Allowed Lateness)时,窗口将被完全删除。
  3. 每个 Window 之上,都绑定有一个 Trigger 或者一个 Function(ProcessWindowFunction, ReduceFunction, or AggregateFunction)用来执行窗口内数据的计算。
  4. 可以给 Window 指定一个 Evictor,它能够在 after the trigger fires 以及 before and/or after the function is applied 从窗口中删除元素。

Flink Window 类型

Flink 流批同一前后的 Window 分类:
在这里插入图片描述

tumblingwindows —— 滚动窗口

在这里插入图片描述

slidingwindows —— 滑动窗口

在这里插入图片描述

session windows —— 会话窗口

在这里插入图片描述

global windows —— 全局窗口

在这里插入图片描述

Flink Window 操作使用

高级玩法:自定义 Trigger、自定义 Evictor,读者可自行搜索相关文章与代码。

Flink Window 增量聚合

  • reduce(ReduceFunction)
  • aggregate(AggregateFunction)
  • sum()
  • min()
  • max()
  • sum()

Flink Window 全量聚合

  • apply(WindowFunction)
  • process(ProcessWindowFunction)

Flink Window Join

// 在 Flink 中对两个 DataStream 做 Join
// 1、指定两张表
// 2、指定这两张表的链接字段
stream.join(otherStream) // 两个流进行关联.where(<KeySelector>) // 选择第一个流的key作为关联字段.equalTo(<KeySelector>) // 选择第二个流的key作为关联字段.window(<WindowAssigner>) // 设置窗口的类型.apply(<JoinFunction>) // 对结果做操作 process apply = foreach

Tumbling Window Join

在这里插入图片描述

Sliding Window Join

在这里插入图片描述

Session Window Join

在这里插入图片描述

Interval Join

在这里插入图片描述
核心代码示例:

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(first + "," + second);}});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/338658.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uniapp 字母索引列表插件(组件版) Ba-SortList

简介&#xff08;下载地址&#xff09; Ba-SortList 是一款字母索引列表组件版插件&#xff0c;可自定义样式&#xff0c;支持首字母字母检索、首字检索、搜索等等&#xff1b;支持点击事件。 支持首字母字母检索支持首字检索支持搜索支持点击事件支持长按事件支持在uniapp界…

20240110从官网下载7-zip

20240110从官网下载7-zip 2024/1/10 15:17 百度搜索&#xff1a;7-zip 官网 https://sparanoid.com/lab/7z/ 欢迎来到 7-Zip 官方中文网站&#xff01; 7-Zip 是一款拥有极高压缩比的开源压缩软件。 下载 7-Zip 23.01 稳定版适用于 Windows 操作系统&#xff08;2023-06-30&a…

6.1.2捕捉图像(3)

6&#xff0e;文字捕捉 除了可以捕捉图像外&#xff0c;HyperSnap6还有一个非常神奇、非常实用的功能——文字捕捉。利用文字捕捉&#xff0c;可以把一段不可复制的文字捕捉下来&#xff0c;以便于重新编辑。 (1)右单击桌面上的“我的电脑”&#xff0c;在弹出的快捷菜单中选…

TS 36.321 V12.0.0-MAC过程

​本文的内容主要涉及TS 36.321&#xff0c;版本是C00&#xff0c;也就是V12.0.0。

Flink中的状态管理

一.Flink中的状态 1.1 概述 在Flink中&#xff0c;算子任务可以分为有状态和无状态两种状态。 无状态的算子任务只需要观察每个独立事件&#xff0c;根据当前输入的数据直接转换输出结果。例如Map、Filter、FlatMap都是属于无状态算子。 而有状态的算子任务&#xff0c;就…

labelstudio镜像构建提示 Problem executing scripts APT::Update::Post-Invoke ‘rm

构建镜像真难&#xff0c;害惨了我。报如下的错误&#xff1a; #12 54.36 E: Problem executing scripts APT::Update::Post-Invoke rm -f /var/cache/apt/archives/*.deb /var/cache/apt/archives/partial/*.deb /var/cache/apt/*.bin || true #12 54.36 E: Sub-process retur…

k8s的存储卷

存储卷------数据卷 把容器内的目录&#xff0c;和宿主机的目录进行挂载。 容器在系统上的生命周期是短暂的&#xff0c;delete&#xff0c;k8s用控制&#xff08;deployment&#xff09;创建的pod&#xff0c;delete相当于重启&#xff0c;容器的状态也会回复到初始状态。 …

基于传统机器学习的项目开发过程——@挑大梁

1 场景分析 1.1 项目背景 描述开发项目模型的一系列情境和因素&#xff0c;包括问题、需求、机会、市场环境、竞争情况等 1.2. 解决问题 传统机器学习在解决实际问题中主要分为两类&#xff1a; 有监督学习&#xff1a;已知输入、输出之间的关系而进行的学习&#xff0c;从而…

kubeSphere DevOps自定义容器环境JDK11

kubeSphere DevOps自定义容器环境JDK11 &#x1f342;前言&#x1f342;增加JDK11容器环境&#x1f341;检查是否成功 &#x1f342;不生效的原因排查&#x1f341;按步骤执行如下命令 &#x1f342;前言 kubeSphere 版本v3.1.1 遇到问题:kubeSphere默认支持容器只有JDK8,目前…

首次落地零担快运!商用车自动驾驶跑出交付加速度

即将迈入2024年&#xff0c;还活着的自动驾驶玩家&#xff0c;身上有两个显著标签&#xff1a;选对了细分赛道、会玩。 10月以来&#xff0c;Cruise宣布在美国德州奥斯汀、休斯顿、亚利桑那州凤凰城和加州旧金山全面停止所有自动驾驶出租车队运营服务&#xff0c;通用汽车计划…

Vulnhub-VULNCMS: 1渗透

文章目录 一、前言1、靶机ip配置2、渗透目标3、渗透概括 开始实战一、信息获取二、获取shell三、获取密码文件四、提权 一、前言 由于在做靶机的时候&#xff0c;涉及到的渗透思路是非常的广泛&#xff0c;所以在写文章的时候都是挑重点来写&#xff0c;尽量的不饶弯路。具体有…

Laravel 使用rdkafka_laravel详细教程(实操避坑)

一、选择rdkafka 首先要看版本兼容问题&#xff0c;我的是Laravel5.6&#xff0c;PHP是7.3.13&#xff0c;所以需要下载兼容此的rdkafka&#xff0c;去 Packagist 搜索 kafka &#xff0c;我用的是 Packagist选择里面0.10.5版本&#xff0c; 二、安装rdkafka 在 Laravel 项目…