Flink的KeyedProcessFunction基于Event Time和Process Time的定时器用法实例分析

FLink处理函数简介

在Flink底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的【处理】(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作【处理函数】(process function)。在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。

Flink几种处理函数简介

  1. ProcessFunction是用于处理数据流的通用函数。它是一个抽象类,定义了处理数据流的常用方法,如processElement,onTimer等。您可以扩展ProcessFunction类并重写这些方法,以便在Flink程序中执行复杂的数据流处理逻辑。
  2. KeyedProcessFunction是ProcessFunction的特殊类型,用于处理带有键的数据流。它定义了额外的方法,如getKey,context.timerService()等,用于访问数据流中每个元素的键以及在处理函数中安排定时器。
  3. ProcessWindowFunction和ProcessAllWindowFunction是用于处理时间窗口的特殊函数。它们提供了一个process方法,用于在每个窗口中对数据进行处理。ProcessWindowFunction接受带有键的数据流,并且每个窗口都对应于一个键,而ProcessAllWindowFunction接受不带键的数据流,并且每个窗口都包含整个数据流。

这里重点介绍KeyedProcessFunction,KeyedProcessFunction是用来处理KeyedStream的。每有一个数据进入算子,则会触发一次processElement()的处理。它还提供了定时器的功能,在在预警、监控等场景特定场景下,非常适合。
KeyedProcessFunction定时器包分为两种:基于事件时间、基于处理时间。下面以统计计数的方式展示这两种定时器的用法,并附上详细的分析思路。以下用例基于Flink1.14

实例分析

KeyedProcessFunction基于事件时间的定时器

代码:


import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;/*** @description:** @author pony* @date 2024/1/17 20:55* @version 1.0* nc -l 9999*/
public class KeyedProcessFunctionOnTimerEventTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(60)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {return Long.valueOf(element.split(",")[1]);}}).withIdleness(Duration.ofSeconds(1));DataStream<Tuple2<String, Long>> stream0 = env.socketTextStream("x.x.x.x", 9999).assignTimestampsAndWatermarks(watermarkStrategy) //必须在数据源上指定watermark.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {return new Tuple2<String, Long>(value.split(",")[0], Long.valueOf(value.split(",")[1]));}});// apply the process function onto a keyed streamDataStream<Tuple2<String, Long>> result = stream0.keyBy(value -> value.f0).process(new CountEventTimeWithTimeoutFunction());result.print();env.execute("KeyedProcessFunction wordCount");}/*** The implementation of the ProcessFunction that maintains the count and timeouts*/static class CountEventTimeWithTimeoutFunctionextends KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>> {private ValueState<Long> state;private static final Integer DELAY = 1000; //1s@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));}@Overridepublic void processElement(Tuple2<String, Long> value,Context ctx,Collector<Tuple2<String, Long>> out) throws Exception {Long current = state.value();if (current == null) {current = 0L;}current++;state.update(current);//获取当前数据流的水位线long currentWatermark = ctx.timerService().currentWatermark();//            long timer = ctx.timestamp() + DELAY;//设置定时器的时间为当前event time+DELAYlong timer = currentWatermark + DELAY;//设置定时器的时间为当前水位线+DELAY//注册事件时间定时器,与watermark绑定,必须满足条件: watermark >= timer 来触发特定event的定时器ctx.timerService().registerEventTimeTimer(timer);//删除事件时间定时器if (currentWatermark < 0) {ctx.timerService().deleteEventTimeTimer(timer);}System.out.println("last Watermark: " + currentWatermark + ", format: " + time(currentWatermark));// 打印信息,用于核对数据System.out.println(String.format("processElement: %s, %d, ctx.timestamp() : %d (%s), timer : %d (%s)\n",ctx.getCurrentKey(),current,ctx.timestamp(),time(ctx.timestamp()),timer,time(timer)));}@Overridepublic void onTimer(long timestamp, //定时器触发时间,等于以上的timerOnTimerContext ctx,Collector<Tuple2<String, Long>> out) throws Exception {// 取得当前单词String currentKey = ctx.getCurrentKey();// get the state for the key that scheduled the timerLong result = state.value();// 打印数据,用于核对是否符合预期System.out.println(String.format("onTimer: %s, %d, ctx.timestamp() : %d (%s), timestamp : %d (%s)\n",currentKey,result,ctx.timestamp(),time(ctx.timestamp()),timestamp,time(timestamp)));System.out.println("current Watermark: " + ctx.timerService().currentWatermark() + ", format: " + time(ctx.timerService().currentWatermark()));out.collect(new Tuple2<String, Long>(currentKey, result));}@Overridepublic void close() throws Exception {super.close();state.clear();}}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(timeStamp));}
}

测试数据:

nc -l 9999
a1,1704038400000
a1,1704038401000
a1,1704038403000

运行结果:
在这里插入图片描述

KeyedProcessFunction基于处理时间的定时器

代码:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;/*** @description:** @author pony* @date 2024/1/17 20:55* @version 1.0* nc -l 9999*/
public class KeyedProcessFunctionOnTimerProcessTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(60)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {
//                        return System.currentTimeMillis();return Long.valueOf(element.split(",")[1]);}}).withIdleness(Duration.ofSeconds(1));DataStream<Tuple2<String, Long>> stream0 = env.socketTextStream("x.x.x.x", 9999).assignTimestampsAndWatermarks(watermarkStrategy) //必须在数据源上指定watermark.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {return new Tuple2<String, Long>(value.split(",")[0], Long.valueOf(value.split(",")[1]));}});// apply the process function onto a keyed streamDataStream<Tuple2<String, Long>> result = stream0.keyBy(value -> value.f0).process(new CountProcessTimeWithTimeoutFunction());result.print();env.execute("KeyedProcessFunction wordCount");}static class CountProcessTimeWithTimeoutFunctionextends KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>> {private ValueState<Long> state;private static final Integer DELAY = 60 * 1000; //1s@Overridepublic void open(Configuration parameters) throws Exception {state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));}@Overridepublic void processElement(Tuple2<String, Long> value,Context ctx,Collector<Tuple2<String, Long>> out) throws Exception {Long current = state.value();if (current == null) {current = 0L;}current++;state.update(current);long timer = ctx.timestamp() + DELAY;//设置定时器的时间为当前event time+DELAY//注册处理时间定时器, 与watermark无关,定时器触发条件:当前系统时间>timerctx.timerService().registerProcessingTimeTimer(timer);//删除处理时间定时器
//            ctx.timerService().deleteProcessingTimeTimer(timer);System.out.println("processElement currentProcessingTime: " + ctx.timerService().currentProcessingTime() + ", format: " + time(ctx.timerService().currentProcessingTime()));// 打印所有信息,用于核对数据System.out.println(String.format("processElement: %s, %d, ctx.timestamp() : %d (%s), timer : %d (%s)\n",ctx.getCurrentKey(),current,ctx.timestamp(),time(ctx.timestamp()),timer,time(timer)));}@Overridepublic void onTimer(long timestamp,OnTimerContext ctx,Collector<Tuple2<String, Long>> out) throws Exception {// 取得当前单词String currentKey = ctx.getCurrentKey();// get the state for the key that scheduled the timerLong result = state.value();System.out.println("onTimer currentProcessingTime: " + ctx.timerService().currentProcessingTime() + ", format: " + time(ctx.timerService().currentProcessingTime()));// 打印数据,用于核对是否符合预期System.out.println(String.format("onTimer: %s, %d, ctx.timestamp() : %d (%s), timestamp : %d (%s)\n",currentKey,result,ctx.timestamp(),time(ctx.timestamp()),timestamp,time(timestamp)));//另外还支持侧流OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("single"){};if (result < 2) {ctx.output(outputTag, new Tuple2<>(currentKey, result));} else {out.collect(new Tuple2<String, Long>(currentKey, result));}}@Overridepublic void close() throws Exception {super.close();state.clear();}}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(timeStamp));}
}

测试数据:

nc -l 9999
a,1705568024000    
a,1705568024000

运行结果:
在这里插入图片描述

总结

在真实业务场景中【 KeyedProcessFunction基于处理时间的定时器】用的比较多,比较符合业务场景,即根据事件的时间来指定处理时间去定时触发定时器。因此在此场景中,可以不指定watermarkStrategy,可以获取传输参数的时间时间来定时触发定时器。

参考:
Process Function
Generating Watermarks

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/412443.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker:Java通过nginx获取客户端的真实ip地址

问题现象 我们的平台使用Spring Cloud微服务架构&#xff0c;使用Spring Boot构建Java服务&#xff0c;使用google的jib插件打成docker镜像包我们使用docker虚拟化部署&#xff0c;使用docker-compose统一管理所有服务&#xff0c;包括Java服务和nginx等组件我们前后端分离&am…

ONLYOFFICE:开源、免费、安全,打造定制化办公平台的最佳选择

文章目录 写在前面ONLYOFFICE是什么&#xff1f;ONLYOFFICE的惊艳之处齐全的插件&#xff0c;助你锦上添花部署一款自己的安全可靠的办公平台写在最后 写在前面 说起 Office 办公软件&#xff0c;我想大家最常用的应该就是微软的 Microsoft Office 以及国产的 WPS Office。两款…

关于微信小程序 “扫普通链接二维码打开小程序”动态传递多个参数开发过程记录与总结

前言&#xff1a;项目中需要线下 扫描二维码 进入到小程序指定的页面&#xff0c;二维码中 要动态传递多个参数&#xff0c;接下来看看具体怎么实现&#xff0c;整个过程还比较顺利&#xff0c;特把整个过程中遇到的坑点做以总结。 快速跳转官网文档介绍&#xff1a;扫码打开小…

基于深度学习的实例分割的Web应用

基于深度学习的实例分割的Web应用 1. 项目简介1.1 模型部署1.2 Web应用 2. Web前端开发3. Web后端开发4. 总结 1. 项目简介 这是一个基于深度学习的实例分割Web应用的项目介绍。该项目使用PaddlePaddle框架&#xff0c;并以PaddleSeg训练的图像分割模型为例。 1.1 模型部署 …

shopee选品案例分析:如何在Shopee平台上进行选品并取得成功

在Shopee平台上进行选品是卖家们开设店铺的重要步骤之一。通过分析成功案例&#xff0c;卖家们可以获取灵感和策略&#xff0c;从而更好地进行选品。本文将以一个女装店铺为例&#xff0c;介绍如何在Shopee平台上进行选品并取得成功。 先给大家推荐一款shopee知虾数据运营工具…

Oracle21C + PLSQL Developer 15 + Oracle客户端21安装配置完整图文版

一、Oracle21C PLSQL Developer 15 Oracle客户端文件下载 1、Oracl21C下载地址&#xff1a;Database Software Downloads | Oracle 中国 2、 PLSQL Developer 15下载地址&#xff1a;Registered download PL/SQL Developer - Allround Automations 3、 Oracle 客户端下载地址…

使用composer生成的DMG和PKG格式软件包有何区别

在使用Composer从包源构建软件包时候&#xff0c;有两种不同类型的包&#xff1a;PKG和DMG。你知道两者之间的区别吗? 以及如何选取吗&#xff1f; 每种格式都有各自的优势具体取决于软件包的预期用途以及用于部署软件包的工具。下面我们来了解一下PKG和DMG格式的区别和用途。…

vscode连接远程服务器(傻瓜式教学)

**如何在远程服务器上进行编码呢&#xff1f;vscode&#xff01;&#xff01;&#xff01;**当然&#xff0c;还有很多其他选择&#xff0c;例如sublime、ultraedit等等&#xff0c;这里我们用非常流行的vscode来连接ubuntu服务器做讲解&#xff01;1、下载vscode 百度搜索vsc…

怎样实现安全便捷的网间数据安全交换?

数据安全交换是指在数据传输过程中采取一系列措施来保护数据的完整性、机密性和可用性。网间数据安全交换&#xff0c;则是需要进行跨网络、跨网段甚至跨组织地进行数据交互&#xff0c;对于数据的传输要求会更高。 大部分企业都是通过网闸、DMZ区、VLAN、双网云桌面等方式实现…

Oracle 实战手册 工作实战经验总结

目录 一、基本的数据库管理 1、数据库的启动和关闭 ​编辑2、如何确定Oracle的版本&#xff1f; 3、如何修改数据库的内存参数 4、修改用户名密码 5、如何查看最大会话数 6、如何修改oracle数据库的用户连接数 7、解锁用户 8、如何获取被锁定的数据库对象 9、如何确定…

如何使用Synology Drive作为文件同步服务器实现云同步Obsidian笔记

文章目录 一、简介软件特色演示&#xff1a; 二、使用免费群晖虚拟机搭建群晖Synology Drive服务&#xff0c;实现局域网同步1 安装并设置Synology Drive套件2 局域网内同步文件测试 三、内网穿透群晖Synology Drive&#xff0c;实现异地多端同步Windows 安装 Cpolar步骤&#…

项目经理进阶之路:如何应对不同阶段的挑战?

最近看到一个帖子&#xff0c;有网友提问&#xff0c;“项目经理的职业发展会经历哪几个阶段&#xff1f;不同阶段需要关注什么&#xff1f;又分别会遇到哪些挑战&#xff1f;“这个帖子引发了广大项目经理们的热议&#xff0c;大家纷纷吐槽&#xff0c;自己遇到了职业瓶颈、询…