周期性触发的自定义触发器

背景

本文我们实现一个周期性触发的自定义触发器,顺便看下实现自定义触发器的一些要点

周期性触发器实现

实现一个每分钟触发一次的自定义事件时间触发器,实现代码和注意事项如下所示

package wikiedits.trigger;import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;public class OneMinuteIntervalTrigger<W extends Window> extends Trigger<Object, W> {private static final long serialVersionUID = 1L;private final long interval;// 触发时间的状态对象private final ValueStateDescriptor<Long> stateDesc =new ValueStateDescriptor<>("fire-time", TypeInformation.of(Long.class));private OneMinuteIntervalTrigger(long interval) {this.interval = interval;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// 这里其实不是必要的,取决于窗口结束时间到之后是否要触发一次计算// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {// 多次注册也没事,反正是同一个计时器,这表明窗口结束时想要触发一次计算,此外注意getEnd和maxTimestamp方法的区别ctx.registerEventTimeTimer(window.maxTimestamp());}// 仅仅在第一次未注册时注册一次,后续由ontimer触发ValueState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);if (fireTimestamp.value() == null) {long start = timestamp - (timestamp % interval);long nextFireTimestamp = start + interval;ctx.registerEventTimeTimer(nextFireTimestamp);fireTimestamp.update(nextFireTimestamp);}return TriggerResult.CONTINUE;}// 计时器触发的函数@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {// 这里窗口结束时触发不是必要的,取决于是否想要在窗口结束是触发一次计算,并且这里如果不处理延迟的消息,可以返回FIRE_AND_PURGE清理窗口状态(但是注意即使返回PURGE,也不会清理触发器的状态)if (time == window.maxTimestamp()) {return TriggerResult.FIRE;}ValueState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);Long fireTimestamp = fireTimestampState.value();// 继续注册计时器if (fireTimestamp != null && fireTimestamp == time) {fireTimestampState.update(time + interval);ctx.registerEventTimeTimer(time + interval);return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {// 清理触发器状态ValueState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);Long timestamp = fireTimestamp.value();if (timestamp != null) {ctx.deleteEventTimeTimer(timestamp);fireTimestamp.clear();}}}

代码里面注解已经比较详细的说明了注意事项,此外对于状态的清理,我们需要看的是WindowOperator,如下

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/124316.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自然语言处理的分类

动动发财的小手&#xff0c;点个赞吧&#xff01; 简介 作为理解、生成和处理自然语言文本的有效方法&#xff0c;自然语言处理&#xff08;NLP&#xff09;的研究近年来呈现出快速传播和广泛采用。鉴于 NLP 的快速发展&#xff0c;获得该领域的概述并对其进行维护是很困难的。…

【算法训练-二分查找 三】【特殊二分】寻找峰值

废话不多说&#xff0c;喊一句号子鼓励自己&#xff1a;程序员永不失业&#xff0c;程序员走向架构&#xff01;本篇Blog的主题是【数组的二分查找】&#xff0c;使用【数组】这个基本的数据结构来实现&#xff0c;这个高频题的站点是&#xff1a;CodeTop&#xff0c;筛选条件为…

【Linux】RPM包使用详解

&#x1f341; 博主 "开着拖拉机回家"带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——&#x1f390;开着拖拉机回家_大数据运维-CSDN博客 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341; 希望本文能够给您带来一定的帮助&#x1f338;文…

Python爬虫解决中文乱码

目录 一、中文乱码 二、chardet.detect()解决 三、在页面查找编码格式解决 一、中文乱码 问题在于文本的编码格式不正确 import requestsurlhttps://www.shicimingju.com/book/sanguoyanyi.html headers{User-Agent:Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKi…

燃气安全如何保障?万宾燃气管网监测系统时刻感知管网运行态势

近年来随着我国城镇化建设的加快&#xff0c;燃气已经成为每个家庭的必需品。然而&#xff0c;每年夏季频繁发生的燃气爆炸事故&#xff0c;已经严重危害人民生命财产安全危害社会公共安全和公共利益。为了保障燃气安全运行&#xff0c;近日&#xff0c;许多城市都在大力推进燃…

SpringCloudGateway网关中各个过滤器的作用与介绍

文章目录 RemoveCachedBodyFilterAdaptCachedBodyGlobalFilterNettyWriteResponseFilterForwardPathFilterRouteToRequestUrlFilterWebSocketRoutingFilterNettyRoutingFilterForwardRoutingFilterDispatcherHandler 是什么&#xff1f;⭐如何确定最后的路由路径&#xff1f;下…

Vue3+TS+ECharts5实现中国地图数据信息显示

1.引言 最近在做一个管理系统&#xff0c;主要技术栈使用的是Vue3TSViteElementPlus&#xff0c;主要参考项目是yudao-ui-admin-vue3&#xff0c;其中用到ECharts5做数字大屏&#xff0c;展示中国地图相关信息&#xff0c;以此基础做一个分享&#xff0c;写下这篇文章。 &quo…

28270-2012 智能型阀门电动装置 学习笔记

声明 本文是学习GB-T 28270-2012 智能型阀门电动装置. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本标准规定了智能型阀门电动装置(以下简称智能电装)的术语、技术要求、试验方法、检验规则、标 志、包装、运输和贮存条件等。 本标准适…

2023/10/4 -- ARM

今日任务&#xff1a;QT实现TCP服务器客户端搭建的代码&#xff0c;现象 ser&#xff1a; #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);server new QTcpSe…

Ubantu 20.04 卸载与安装 MySQL 5.7 详细教程

文章目录 卸载 MySQL安装 MySQL 5.71.获取安装包2.解压并安装依赖包3.安装 MySQL4.启动 MySQL 扩展开启 gtid 与 binlog 卸载 MySQL 执行以下命令即可一键卸载&#xff0c;包括配置文件目录等。 # 安装sudo软件 apt-get install sudo -y # 卸载所有以"mysql-"开头的…

微信小程序WebSocket实现stream流式聊天对话功能

要在微信小程序实现聊天对话功能&#xff0c;回话是流式应答&#xff0c;这里使用了WebSocket技术。WebSocket大家应该都很熟悉&#xff0c;使用wx.connectSocket就可以了。这里可能需要注意下的是流式应答&#xff0c;后端如何发送&#xff0c;前端如何接收。直接上代码&#…

uboot启动流程-涉及s_init汇编函数

一. uboot启动涉及函数 本文简单分析uboot启动流程中&#xff0c;涉及的汇编函数&#xff1a; lowlevel_init函数调用的函数&#xff1a;s_init 函数 save_boot_params_ret函数调用的函数&#xff1a; _main 函数 本文继上一篇文章的学习&#xff0c;地址如下&#xff1a;…