窗口处理函数包括:ProcessWindowFunction 和 ProcessAllWindowFunction
基础用法
stream.keyBy( t -> t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())
这里的MyProcessWindowFunction
就是ProcessWindowFunction
的一个实现类;
ProcessWindowFunction
是一个典型的全窗口函数,把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理
源码解析
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param context The context in which the window is being evaluated.* @param elements The elements in the window being evaluated.* @param out A collector for emitting elements.* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;/*** Deletes any state in the {@code Context} when the Window expires (the watermark passes its* {@code maxTimestamp} + {@code allowedLateness}).** @param context The context to which the window is being evaluated* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public void clear(Context context) throws Exception {}/** The context holding window metadata. */public abstract class Context implements java.io.Serializable {/** Returns the window that is being evaluated. */public abstract W window();/** Returns the current processing time. */public abstract long currentProcessingTime();/** Returns the current event-time watermark. */public abstract long currentWatermark();/*** State accessor for per-key and per-window state.** <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up by* implementing {@link ProcessWindowFunction#clear(Context)}.*/public abstract KeyedStateStore windowState();/** State accessor for per-key global state. */public abstract KeyedStateStore globalState();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);}
}
类型参数如下:
- IN:input,数据流中窗口任务的输入数据类型
- OUT:output,窗口任务进行计算之后的输出数据类型
- KEY:数据中键 key 的类型
- W:窗口的类型,是 Window 的子类型。一般情况下我们定义时间窗口,W就是
TimeWindow
定义方法如下:
process
(窗口处理函数不是逐个处理数据)
- key:窗口做统计计算基于的键,也就是之前 keyBy 用来分区的字段
- context:当前窗口进行计算的上下文
- elements:窗口收集到用来计算的所有数据,这是一个可迭代的集合类型
- out:用来发送数据输出计算结果的收集器,类型为 Collector
可以明显看出,这里的参数不再是一个输入数据,而是窗口中所有数据的集合。而上下文context 所包含的内容也跟其他处理函数有所差别:
①不再提供设置定时器的方法
②由于当前不是只处理一个数据,所以也不再提供
.timestamp()
方法③可以通过
.window()
直接获取到当前的窗口对象④可以通过
.windowState()
和.globalState()
获取到当前自定义的窗口状态和全局状态
clear()
:
进行窗口的清理工作:如果我们自定义了窗口状态,那么必须在.clear()方法中进行显式地清除,避免内存溢出
学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili