Flink中时间和窗口

文章目录

    • 一、时间定义
    • 二、水位线(Watermark)
      • 1、概念
      • 2、水位线特征
      • 3、生成水位线
        • 3.1 水位线生成策略(Watermark Strategies)
        • 3.2 Flink 内置水位线生成器
        • 3.3 自定义水位线策略
      • 4、水位线的传递
    • 三、窗口(Window)
      • 1、概念
      • 2、窗口分类
        • 2.1 驱动类型分类
        • 2.2 窗口分配数据规则
      • 3、API概述
        • 3.1 按键分区(Keyed)和非按键分区(Non-Keyed)
        • 3.2 代码中窗口 API 的调用
      • 4、窗口分配器((Window Assigners)
        • 4.1 时间窗口
        • 4.2 计数窗口
        • 4.3 全局窗口
      • 5、窗口函数(Window Functions)
        • 5.1 增量聚合函数(incremental aggregation functions)
          • 5.1.1 归约函数(ReduceFunction)
        • 5.2 全窗口函数


一、时间定义

如图所示,在事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被 Flink 系统中的 Source 算子读取消费,进而向下游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理。

​ 有两个非常重要的时间点:一个是数据产生的时间,我们把它叫作“事件时间”(Event Time);另一个是数据真正被处理的时刻,叫作“处理时间”(Processing Time)。我们所定义的窗口操作,到底是以那种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有所滞后。

  • 处理时间(Processing Time): 是指执行处理操作的机器的系统时间。
  • 事件时间(Event Time): 指每个事件在对应的设备上发生的时间,也就是数据生成的时间。

在这里插入图片描述

二、水位线(Watermark)

1、概念

在实际应用中,一般会采用事件时间语义。而水位线,就是基于事件时间提出的概念。

​ 在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数据的时间戳来驱动的。

在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。

​ 水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

在这里插入图片描述

如图所示,每个事件产生的数据,都包含了一个时间戳,我们直接用一个整数表示。当产生于2 秒的数据到来之后,当前的事件时间就是 2 秒;在后面插入一个时间戳也为 2 秒的水位线,随着数据一起向下游流动。而当 5 秒产生的数据到来之后,同样在后面插入一个水位线,时间戳也为 5,当前的时钟就推进到了 5 秒。这样,如果出现下游有多个并行子任务的情形,我们只要将水位线广播出去,就可以通知到所有下游任务当前的时间进度了。

1.有序流中水位线

在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;如图 所示。所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。

在这里插入图片描述

2.乱序流中水位线

道在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是所谓的“乱序数据”。

在这里插入图片描述

​ 如图所示,第一个水位线时间戳为 7,它表示当前事件时间是 7 秒,7 秒之前的数据都已经到齐,之后再也不会有了;同样,第二个、第三个水位线时间戳分别为 12 和 20,表示11 秒、20 秒之前的数据都已经到齐,如果有对应的窗口就可以直接关闭了,统计的结果一定是正确的。这里由于水位线是周期性生成的,所以插入的位置不一定是在时间戳最大的数据后面。
​ 另外需要注意的是,这里一个窗口所收集的数据,并不是之前所有已经到达的数据。因为数据属于哪个窗口,是由数据本身的时间戳决定的,一个窗口只会收集真正属于它的那些数据。也就是说,上图中尽管水位线 W(20)之前有时间戳为 22 的数据到来,10~20 秒的窗口中也不会收集这个数据,进行计算依然可以得到正确的结果。

2、水位线特征

现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。

水位线是 Flink 流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据。
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展。
  • 水位线是基于数据的时间戳生成的。
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进。
  • 水位线可以通过设置延迟,来保证正确处理乱序数据。
  • 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据。

3、生成水位线

所以 Flink 中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

3.1 水位线生成策略(Watermark Strategies)

1. 水位线配置API(assignTimestampsAndWatermarks)

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( WatermarkStrategy<T> watermarkStrategy )

直接用 DataStream 调用该方法即可,与普通的 transform 方法完全一样。

stream06.assignTimestampsAndWatermarks(new CustomWatermarkStrategy());

2. 水位线生成策略(WatermarkGenerator)

.assignTimestampsAndWatermarks()方法需要传入一个WatermarkStrategy作为参数,这就是所谓的水位线生成策略

WatermarkStrategy中包含了一个时间戳分配器TimestampAssigner和一个水位线生成器WatermarkGenerator

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
  • TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
  • WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
  • onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作.
  • onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。
env.getConfig().setAutoWatermarkInterval(60 * 1000L);

3.2 Flink 内置水位线生成器

WatermarkStrategy 这个接口是一个生成水位线策略的抽象,让我们可以灵活地实现自己的需求;Flink提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,而且也为我们自定义水位线策略提供了模板。

1.有序流

对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。

简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。

时间戳和水位线的单位,必须都是毫秒。

stream06.assignTimestampsAndWatermarks(WatermarkStrategy//  有序流(时间戳单调递增).<Event>forMonotonousTimestamps()//  时间戳抽取逻辑.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timstamp;}})
);

上面代码中我们调用.withTimestampAssigner()方法,将数据中的 timestamp 字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。

2.乱序流

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。

这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;

//  插入水位线逻辑
stream06.assignTimestampsAndWatermarks(WatermarkStrategy//  乱序流(针对乱序流插入水位线,延迟时间设置为5s).<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))//  时间戳抽取逻辑.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timstamp;}})
);

事实上,有序流的水位线生成器本质上和乱序流是一样的,相当于延迟设为 0 的乱序流水位线生成器,两者完全等同:

WatermarkStrategy.forMonotonousTimestamps()
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))

乱序流中生成的水位线真正的时间戳,其实是 当前最大时间戳 – 延迟时间 – 1。

3.3 自定义水位线策略

1. 周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水位线。

import com.lydms.flink.domain.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义水位线的产生
public class CustomWatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomPeriodicGenerator();}}public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutputoutput) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}

2. 断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。

public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {@Overridepublic void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {// 只有在遇到特定的 itemId 时,才发出水位线if (r.user.equals("Mary")) {output.emitWatermark(new Watermark(r.timestamp - 1));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线}
}

我们在 onEvent()中判断当前事件的 user 字段,只有遇到“Mary”这个特殊的值时,才调用output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。

4、水位线的传递

如图所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。具体过程如下:

在这里插入图片描述

  1. 上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线” (Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。
  2. 当有一个新的水位线(第一分区的 4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的 3,于是当前任务时钟就推进到了 3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。
  3. 再次收到新的水位线(第二分区的 7)后,执行同样的处理流程。首先将第二个分区时钟更新为 7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
  4. 同样道理,当又一次收到新的水位线(第三分区的 6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的 4,所以当前任务的时钟推进到 4,并发出时间戳为 4 的水位线,广播到下游各个分区任务。

水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。

三、窗口(Window)

1、概念

把无界流进行切分,每一段数据分别进行聚合,结果只输出一次。这就相当于将无界流的聚合转化为了有界数据集的聚合,这就是所谓的“窗口”(Window)聚合操作。窗口聚合其实是对实时性和处理效率的一个权衡。

Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。窗口就是用来处理无界流的核心。

在这里插入图片描述

Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开。

  1. 第一个数据时间戳为 2,判断之后创建第一个窗口[0, 10),并将 2 秒数据保存进去;
  2. 后续数据依次到来,时间戳均在 [0, 10)范围内,所以全部保存进第一个窗口;
  3. 11 秒数据到来,判断它不属于[0, 10)窗口,所以创建第二个窗口[10, 20),并将 11秒的数据保存进去。由于水位线设置延迟时间为 2 秒,所以现在的时钟是 9 秒,第一个窗口也没有到关闭时间;
  4. 之后又有 9 秒数据到来,同样进入[0, 10)窗口中;
  5. 12 秒数据到来,判断属于[10, 20)窗口,保存进去。这时产生的水位线推进到了 10秒,所以 [0, 10)窗口应该关闭了。第一个窗口收集到了所有的 7 个数据,进行处理计算后输出结果,并将窗口关闭销毁;
  6. 同样的,之后的数据依次进入第二个窗口,遇到 20 秒的数据时会创建第三个窗口[20, 30)并将数据保存进去;遇到 22 秒数据时,水位线达到了 20 秒,第二个窗口触发计算,输出结果并关闭。

2、窗口分类

在这里插入图片描述

2.1 驱动类型分类

  • 时间窗口(Time Window)
  • 计数窗口(Count Window)

窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是“怎样截取数据”。换句话说,就是以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类型”。

时间窗口(Time Window):按照时间段去截取数据。

计数窗口(Count Window):按照固定的个数,来截取一段数据集。

在这里插入图片描述

时间窗口(Time Window)

时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。所以可以说基本思路就是“定点发车”。

窗口中的数据,最大允许的时间戳就是 end - 1,这也就代表了我们定义的窗口时间范围都是左闭右开的区间[start,end)。

Flink 中有一个专门的类来表示时间窗口,名称就叫作 TimeWindow。这个类只有两个私有属性:start 和 end,表示窗口的开始和结束的时间戳,单位为毫秒。

public class TimeWindow extends Window {private final long start;private final long end;public TimeWindow(long start, long end) {this.start = start;this.end = end;}//	获取开始时间戳public long getStart() {return start;}//	获取结束时间戳public long getEnd() {return end;}
}

计数窗口(Count Window)

计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。

计数窗口相比时间窗口就更加简单,我们只需指定窗口大小,就可以把数据分配到对应的窗口中了。在 Flink 内部也并没有对应的类来表示计数窗口,底层是通过“全局窗口”(Global Window)来实现的。

2.2 窗口分配数据规则

  • 滚动窗口(Tumbling Window)
  • 滑动窗口(Sliding Window)
  • 会话窗口(Session Window)
  • 全局窗口(Global Window)

滚动窗口(Tumbling Window)

  • 滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。
  • 窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。
  • 每个数据都会被分配到一个窗口,而且只会属于一个窗口。
  • 需要的参数只有一个,就是窗口的大小(window size)。
  • 滚动窗口可以基于时间定义,也可以基于数据个数定义;

如图所示,小圆点表示流中的数据,我们对数据按照 userId 做了分区。当固定了窗口大小之后,所有分区的窗口划分都是一致的;窗口没有重叠,每个数据只属于一个窗口。

在这里插入图片描述

滑动窗口(Sliding Window)

与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。

  • 参数有两个:窗口大小(window size)之外,还有一个“滑动步长”(window slide)。
  • 滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于滑动步长(size = slide)。

我们可以看到,当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会被同时分配到多个窗口中。而具体的个数,就由窗口大小和滑动步长的比值(size/slide)来决定。如图所示,滑动步长刚好是窗口大小的一半,那么每个数据都会被分配到 2 个窗口里。

在这里插入图片描述

会话窗口(Session Window)

是基于“会话”(session)来来对数据进行分组的,借用会话超时失效的机制来描述窗口。

  • 如果接下来还有数据陆续到来,那么就一直保持会话;
  • 如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。
  • 参数就是这段时间的长度(size),它表示会话的超时时间。

与前两种窗口不同,会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。如图 6-19 所示,会话窗口之间一定是不会重叠的,而且会留有至少为 size 的间隔(session gap)。

在这里插入图片描述

全局窗口(Global Window)

全局窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。

可以看到,全局窗口没有结束的时间点,所以一般在希望做更加灵活的窗口处理时自定义使用。Flink 中的计数窗口(Count Window),底层就是用全局窗口实现的。

在这里插入图片描述

3、API概述

3.1 按键分区(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流 KeyedStream来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是说,在调用窗口算子之前,是否有 keyBy 操作。

按键分区窗口(Keyed Windows)

​ 经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。

stream.keyBy(...).window(...)

非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。

在代码中,直接基于 DataStream 调用.windowAll()定义窗口。

stream.windowAll(...)

3.2 代码中窗口 API 的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>).window(<window assigner>).aggregate(<window function>)
  • .window()方法需要传入一个窗口分配器,它指明了窗口的类型;

  • .aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。

4、窗口分配器((Window Assigners)

在这里插入图片描述

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。

4.1 时间窗口

时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。

1-1 滚动处理时间窗口

窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()。

这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建了一个长度为 5 秒的滚动窗口。

定义 1 天的窗口,默认就从 0 点开始;如果定义 1 小时的窗口,默认就从整点开始。

stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)

可以增加起始时间偏移量,来确定什么时间开始任务的执行。

//	早上8点开始执行(增加偏移量)
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))

1-2. 滑动处理时间窗口

窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。

stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(...)

这里.of()方法需要传入两个 Time 类型的参数:size 和 slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口。

滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

1-3. 会话时间窗口

窗口分配器由类ProcessingTimeSessionWindows提供,需要调用它的静态方法.withGap()或者.withDynamicGap()

这里.withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最小间隔 session gap。

stream.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)

案例:

.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {@Overridepublic long extract(Tuple2<String, Long> element) { // 提取 session gap 值返回, 单位毫秒return element.f0.length() * 1000;}
}))

2-1. 滚动事件时间窗口

窗口分配器由类 TumblingEventTimeWindows 提供,用法与滚动处理事件窗口完全一致。

stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(...)

2-2 滑动事件时间窗口

窗口分配器由类 SlidingEventTimeWindows 提供,用法与滑动处理事件窗口完全一致。

stream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(...)

2-3 事件时间会话窗口

窗口分配器由类 EventTimeSessionWindows 提供,用法与处理事件会话窗口完全一致。

stream.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)

4.2 计数窗口

计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink 为我们提供了非常方便的接口:直接调用.countWindow()方法。

根据分配规则的不同,又可以分为滚动计数窗口和滑动计数窗口两类。

1. 滚动计数窗口

滚动计数窗口只需要传入一个长整型的参数 size,表示窗口的大小。

定义一个长度为 10 的滚动计数窗口,当窗口中元素数量达到 10 的时候,就会触发计算执行并关闭窗口。

stream.keyBy(...).countWindow(10)

2. 滑动计数窗口

与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size 和 slide,前者表示窗口大小,后者表示滑动步长。

stream.keyBy(...).countWindow(103)

定义一个长度为 10、滑动步长为 3 的滑动计数窗口。每个窗口统计 10 个数据,每隔 3 个数据就统计输出一次结果。

4.3 全局窗口

全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由 GlobalWindows 类提供。

stream.keyBy(...).window(GlobalWindows.create());

需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

5、窗口函数(Window Functions)

经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream,如图所示。

在这里插入图片描述

处理的方式可以分为两类:增量聚合函数和全窗口函数。

5.1 增量聚合函数(incremental aggregation functions)

为了提高实时性,可以像DataStream简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了。区别在于不立即输出结果,而是等到窗口结束时间,拿出之前聚合的状态直接输出。

典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction。

5.1.1 归约函数(ReduceFunction)

最基本的聚合方式就是归约(reduce)。

5.2 全窗口函数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/4602.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高级SQL语句

目录 MySQL 高级(进阶) SQL 语句函数数学函数&#xff1a;聚合函数字符串函数&#xff1a; 连接查询inner join(内连接)&#xff1a;left join(左连接)&#xff1a;right join(右连接)&#xff1a; CREATE VIEW&#xff08;视图&#xff09;UNION&#xff08;联集&#xff09;C…

字符串转字典类型时出现的NameError: name ‘false‘ is not defined

报错的原因 eval&#xff08;&#xff09;函数不能够处理flase&#xff0c;null&#xff0c;true这个几个值 解决方法&#xff1a; 我的理解就是给false, null, true 赋一个eval可以处理的值然后在使用eval函数进行处理 global false, null, true false null true 完美解…

Kafka可视化平台EFAK搭建及使用

文章目录 1.EFAK可视化平台介绍2.搭建EFAK可视化平台2.1.安装JDK环境2.2.安装MySQL数据库2.3.下载EAK二进制安装包并部署2.4.配置EFAK连接Zookeeper集群2.5.调整Eagle启动文件中的变量信息2.6.启动EFAK可视化平台 3.使用EFAK可视化平台3.1.登陆EFAK可视化平台3.2.EFAK仪表盘展示…

MySQL:多表查询(全面详解)

MySQL&#xff1a;多表查询 前言附录&#xff1a;常用的 SQL 标准有哪些一、一个案例引发的多表连接1、案例说明2、笛卡尔积&#xff08;或交叉连接&#xff09;的理解3、案例分析与问题解决 二、多表查询分类讲解1、等值连接 vs 非等值连接1.1 等值连接1.2 非等值连接 2、自连…

代码随想录算法训练营第17期第4天(5休息) | 24. 两两交换链表中的节点、

目录 24. 两两交换链表中的节点 19. 删除链表的倒数第 N 个结点 面试题 02.07. 链表相交 ​​​​​​142. 环形链表 II 这题不是很难&#xff0c;目前除了从【.】变成了【->】之外&#xff0c;python和C也没啥区别 另外就是对虚拟头结点的掌握了 /*** Definition for …

一步一步学OAK之四:实现如何在低延迟下使用高分辨率视频

目录 Setup 1: 创建文件Setup 2: 安装依赖Setup 3: 导入需要的包Setup 4: 创建pipelineSetup 5: 创建节点Setup 6: 设置节点的属性和参数。Setup 7: 建立链接关系Setup 8: 连接设备并启动管道Setup 9: 创建与DepthAI设备通信的输入队列和输出队列Setup 10: 主循环获取视频帧显示…

Mabatis(CRUD)

Mybatis CRUD(数据和配置使用Mybatis快速入门) select标签 选择&#xff0c;查询语句: <select id"getUserById" resultType"com.louis.pojo.User" parameterType"int"></select>id:就是对应namespace中的方法名(就相当于重写了…

【MATLAB第49期】基于MATLAB的深度学习ResNet-18网络不平衡图像数据分类识别模型

【MATLAB第49期】基于MATLAB的深度学习ResNet-18网络不平衡图像数据分类识别模型 一、基本介绍 这篇文章展示了如何使用不平衡训练数据集对图像进行分类&#xff0c;其中每个类的图像数量在类之间不同。两种最流行的解决方案是down-sampling降采样和over-sampling过采样。 在…

一元函数微分学中导数--定义--意义--基本公式--运算法则

目录 导数的定义 左导数和右导数 导数的几何意义和物理意义 几何意义 导数的几何意义--切线的斜率 物理意义 导数的物理意义——瞬时速度 基本初等函数导数公式 基本初等函数 常用基本初等函数导数公式 导数求解的四则运算法则 函数的求导法则 复合函数求导法则 导…

FullGC调优100倍,掌握这3招,吊打JVM调优

前言&#xff1a; 在40岁老架构师尼恩的读者社区&#xff08;50&#xff09;中&#xff0c;很多小伙伴拿不到offer&#xff0c;或者拿不到好的offer。 尼恩经常给大家 优化项目&#xff0c;优化简历&#xff0c;挖掘技术亮点。 在指导简历的过程中&#xff0c; 线上问题排查…

C++引用计数

文章目录 1. 什么是引用计数2. 引用计数的实现3. 示例代码 1. 什么是引用计数 引用计数&#xff08;reference count&#xff09;的核心思想是使用一个计数器来标识当前指针指向的对象被多少类的对象所使用&#xff08;即记录指针指向对象被引用的次数&#xff09;。它允许有多…

MySQL数据库——主从复制

目录 前言一、读写分离概述1. 什么是读写分离&#xff1f;2. 为什么要读写分离呢&#xff1f;3. 什么时候要读写分离&#xff1f;4. 主从复制与读写分离5. mysq支持的复制类型6. 主从复制的工作过程7. MySQL主从复制延迟 二、主从复制配置方法主服务器配置从服务器配置 前言 在…