背景
本文我们实现一个周期性触发的自定义触发器,顺便看下实现自定义触发器的一些要点
周期性触发器实现
实现一个每分钟触发一次的自定义事件时间触发器,实现代码和注意事项如下所示
package wikiedits.trigger;import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;public class OneMinuteIntervalTrigger<W extends Window> extends Trigger<Object, W> {private static final long serialVersionUID = 1L;private final long interval;// 触发时间的状态对象private final ValueStateDescriptor<Long> stateDesc =new ValueStateDescriptor<>("fire-time", TypeInformation.of(Long.class));private OneMinuteIntervalTrigger(long interval) {this.interval = interval;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// 这里其实不是必要的,取决于窗口结束时间到之后是否要触发一次计算// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {// 多次注册也没事,反正是同一个计时器,这表明窗口结束时想要触发一次计算,此外注意getEnd和maxTimestamp方法的区别ctx.registerEventTimeTimer(window.maxTimestamp());}// 仅仅在第一次未注册时注册一次,后续由ontimer触发ValueState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);if (fireTimestamp.value() == null) {long start = timestamp - (timestamp % interval);long nextFireTimestamp = start + interval;ctx.registerEventTimeTimer(nextFireTimestamp);fireTimestamp.update(nextFireTimestamp);}return TriggerResult.CONTINUE;}// 计时器触发的函数@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {// 这里窗口结束时触发不是必要的,取决于是否想要在窗口结束是触发一次计算,并且这里如果不处理延迟的消息,可以返回FIRE_AND_PURGE清理窗口状态(但是注意即使返回PURGE,也不会清理触发器的状态)if (time == window.maxTimestamp()) {return TriggerResult.FIRE;}ValueState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);Long fireTimestamp = fireTimestampState.value();// 继续注册计时器if (fireTimestamp != null && fireTimestamp == time) {fireTimestampState.update(time + interval);ctx.registerEventTimeTimer(time + interval);return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {// 清理触发器状态ValueState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);Long timestamp = fireTimestamp.value();if (timestamp != null) {ctx.deleteEventTimeTimer(timestamp);fireTimestamp.clear();}}}
代码里面注解已经比较详细的说明了注意事项,此外对于状态的清理,我们需要看的是WindowOperator,如下