【Flink】窗口(Window)

窗口理解

窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。 本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。

对窗口的正确理解
我们将窗口理解为一个一个的水桶,数据流(stream)就像水流,每个数据都会分发到对应的桶中,当达到结束时间时,对每个桶中收集的数据进行计算处理
在这里插入图片描述

Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口

窗口的分类

按照驱动类型分

时间窗口(Time Window)

以时间来定义窗口的开始和结束,获取某一段时间内的数据(类比于我们的定时发车

计数窗口(Count Window)

计数窗口是基于元素的个数来获取窗口,达到固定个数时就计算并关闭窗口。(类比于我们的人齐才发车

按照窗口分配数据的规则分类

滚动窗口(Tumbling Window)

窗口之间没有重叠,也不会有间隔的首尾相撞状态,这样,每个数据都会被分到一个窗口,而且只会属于一个窗口。
滚动窗口的应用非常广泛,它可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。
在这里插入图片描述

DataStream<T> input = ...;// 滚动 event-time 窗口
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 滚动 processing-time 窗口
input.keyBy(<key selector>).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).<windowed transformation>(<window function>);// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
input.keyBy(<key selector>).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).<windowed transformation>(<window function>);

滑动窗口(Sliding Windows)

滑动窗口大小也是固定的,但是窗口之间并不是首尾相接的,而是重叠的。
在这里插入图片描述

DataStream<T> input = ...;// 滑动 event-time 窗口
input.keyBy(<key selector>).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).<windowed transformation>(<window function>);// 滑动 processing-time 窗口,偏移量为 -8 小时
input.keyBy(<key selector>).window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))).<windowed transformation>(<window function>);

会话窗口(Session Windows)

会话窗口,是基于“会话”(session)来对数据进行分组的,会话窗口只能基于时间来定义。
在这里插入图片描述

DataStream<T> input = ...;// 设置了固定间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 event-time 会话窗口
input.keyBy(<key selector>).window(EventTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);// 设置了固定间隔的 processing-time session 窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).<windowed transformation>(<window function>);// 设置了动态间隔的 processing-time 会话窗口
input.keyBy(<key selector>).window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {// 决定并返回会话间隔})).<windowed transformation>(<window function>);

全局窗口

这种窗口对全局有效,会把相同的key的所有数据分配到同一个窗口中,这种窗口没有结束时间,默认不会触发计算,如果希望对数据进行处理,需要自定义“触发器”。
在这里插入图片描述

DataStream<T> input = ...;input.keyBy(<key selector>).window(GlobalWindows.create()).<windowed transformation>(<window function>);

计数窗口

计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink为我们提供了非常方便的接口:直接调用.countWindow()方法

滚动计数窗口

滚动计数窗口只需要传入一个长整型的参数size,表示窗口的大小。

stream.keyBy(...).countWindow(10)
滑动计数窗口

与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size和slide,前者表示窗口大小,后者表示滑动步长。

stream.keyBy(...).countWindow(103)

窗口函数(Window Functions)

定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了
窗口函数有三种:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。

ReduceFunction

ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。

DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).reduce(new ReduceFunction<Tuple2<String, Long>>() {//v1 和v2是 2个相同类型的输入参数public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {return new Tuple2<>(v1.f0, v1.f1 + v2.f1);}});

AggregateFunction

ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。

/*** The accumulator is used to keep a running sum and a count. The {@code getResult} method* computes the average.*/
private static class AverageAggregateimplements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {@Overridepublic Tuple2<Long, Long> createAccumulator() {return new Tuple2<>(0L, 0L);}@Overridepublic Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);}@Overridepublic Double getResult(Tuple2<Long, Long> accumulator) {return ((double) accumulator.f0) / accumulator.f1;}@Overridepublic Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
}DataStream<Tuple2<String, Long>> input = ...;input.keyBy(<key selector>).window(<window assigner>).aggregate(new AverageAggregate());

接口中有四个方法:

  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。
  • getResult():从累加器中提取聚合的输出结果。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

可以看到,AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

ProcessWindowFunction

ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据

public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("127.0.0.1", 7777).map(new WaterSensorMapFunction());KeyedStream<WaterSensor, String> keyedStream = sensorDS.keyBy(WaterSensor::getId);WindowedStream<WaterSensor, String, TimeWindow> sensorWS = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));sensorWS.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) {// 上下文可以拿到window对象,还有其他东西:侧输出流 等等long startTs = context.window().getStart();long endTs = context.window().getEnd();String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");long count = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements);}}).print();env.execute();}
}

增量聚合和全窗口函数的结合使用

在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。
我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。

// ReduceFunction与WindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<TRKW> function) // ReduceFunction与ProcessWindowFunction结合
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)// AggregateFunction与WindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)// AggregateFunction与ProcessWindowFunction结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,ProcessWindowFunction<VRKW> windowFunction)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/195759.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LRU最近最少使用算法

LRU(LeastRecentlyUsed)“最近最少使用”算法&#xff1a; 1.当缓存空间已满耗用时&#xff0c;淘汰最近最少使用数据的缓存对象以释放更多的缓存空间(用于历史缓存对象的维护)。 2. 哈希表:快速查找缓存对象&#xff1b;双向链表:维护 历史数据所在的节点顺序。 步骤&#xff…

深入理解注意力机制(下)——缩放点积注意力及示例

一、介绍 在这篇文章中&#xff0c;我们将重点介绍 Transformer 背后的 Scaled Dot-Product Attention&#xff0c;并详细解释其计算逻辑和设计原理。 在文章的最后&#xff0c;我们还会提供一个Attention的使用示例&#xff0c;希望读者看完后能够对Attention有更全面的了解。…

某60区块链安全之不安全的随机数实战一

区块链安全 文章目录 区块链安全不安全的随机数实战一实验目的实验环境实验工具实验原理实验内容攻击过程分析合约源代码漏洞EXP利用 不安全的随机数实战一 实验目的 学会使用python3的web3模块 学会以太坊不安全的随机数漏洞分析及利用 实验环境 Ubuntu18.04操作机 实验工…

黑马程序员 计算机网络(笔记)

参考文章&#xff1a;【黑马程序员】计算机网络概述~ 程序员必须掌握的入门基础知识&#xff01; 参考文章&#xff1a;黑马程序员Linux运维工作场景解决方案零基础到就业 参考文章&#xff1a;网络安全运维进阶教程&#xff0c;运维工程师深度学习教程 文章目录 linux入门到…

GIT无效的源路径/URL

ssh-add /Users/haijunyan/.ssh/id_rsa ssh-add -K /Users/haijunyan/.ssh/id_rsa

nodejs+vue慢性胃炎健康管理系统的设计与实现-微信小程序-安卓-python-PHP-计算机毕业设计

随着科学技术的飞速发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;通过科技手段提高自身的优势&#xff1b;对于慢性胃炎健康管理系统当然也不能排除在外&#xff0c;随着网络技术的不断成熟&#xff0c;带动了慢性胃炎健康管理系统&#xff0c; 系统首页、个…

JAVA小游戏拼图

第一步是创建项目 项目名自拟 第二部创建个包名 来规范class 然后是创建类 创建一个代码类 和一个运行类 代码如下&#xff1a; package heima; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.awt.event.KeyEvent; import …

SpringCloud 微服务全栈体系(十四)

第十一章 分布式搜索引擎 elasticsearch 四、RestAPI ES 官方提供了各种不同语言的客户端&#xff0c;用来操作 ES。这些客户端的本质就是组装 DSL 语句&#xff0c;通过 http 请求发送给 ES。官方文档地址&#xff1a;https://www.elastic.co/guide/en/elasticsearch/client/…

nodejs微信小程序-实验室上机管理系统的设计与实现-安卓-python-PHP-计算机毕业设计

用户&#xff1a;管理员、教师、学生 基础功能&#xff1a;管理课表、管理机房情况、预约机房预约&#xff1b;权限不同&#xff0c;预约类型不同&#xff0c;教师可选课堂预约和个人&#xff1b;课堂预约。 目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 …

nodejs+vue实验室上机管理系统的设计与实现-微信小程序-安卓-python-PHP-计算机毕业设计

用户&#xff1a;管理员、教师、学生 基础功能&#xff1a;管理课表、管理机房情况、预约机房预约&#xff1b;权限不同&#xff0c;预约类型不同&#xff0c;教师可选课堂预约和个人&#xff1b;课堂预约。 在实验室上机前&#xff0c;实验室管理员需要对教务处发来的上机课表…

SpringBoot常见注解

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a;每天一个知识点 ✨特色专栏&#xff1a…

leetcode算法之分治-快排

目录 1.颜色分类2.排序数组3.数组中的第k个最大元素4.最小的k个数 1.颜色分类 颜色分类 class Solution { public:void sortColors(vector<int>& nums) {int n nums.size();int left -1,rightn,i0;while(i<right){if(nums[i] 0) swap(nums[left],nums[i]);e…