Flink处理函数(3)—— 窗口处理函数

窗口处理函数包括:ProcessWindowFunction 和 ProcessAllWindowFunction

基础用法
stream.keyBy( t -> t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())

这里的MyProcessWindowFunction就是ProcessWindowFunction的一个实现类;

ProcessWindowFunction是一个典型的全窗口函数,把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理

源码解析
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>extends AbstractRichFunction {private static final long serialVersionUID = 1L;/*** Evaluates the window and outputs none or several elements.** @param key The key for which this window is evaluated.* @param context The context in which the window is being evaluated.* @param elements The elements in the window being evaluated.* @param out A collector for emitting elements.* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;/*** Deletes any state in the {@code Context} when the Window expires (the watermark passes its* {@code maxTimestamp} + {@code allowedLateness}).** @param context The context to which the window is being evaluated* @throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public void clear(Context context) throws Exception {}/** The context holding window metadata. */public abstract class Context implements java.io.Serializable {/** Returns the window that is being evaluated. */public abstract W window();/** Returns the current processing time. */public abstract long currentProcessingTime();/** Returns the current event-time watermark. */public abstract long currentWatermark();/*** State accessor for per-key and per-window state.** <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up by* implementing {@link ProcessWindowFunction#clear(Context)}.*/public abstract KeyedStateStore windowState();/** State accessor for per-key global state. */public abstract KeyedStateStore globalState();/*** Emits a record to the side output identified by the {@link OutputTag}.** @param outputTag the {@code OutputTag} that identifies the side output to emit to.* @param value The record to emit.*/public abstract <X> void output(OutputTag<X> outputTag, X value);}
}

类型参数如下:

  • IN:input,数据流中窗口任务的输入数据类型
  • OUT:output,窗口任务进行计算之后的输出数据类型
  • KEY:数据中键 key 的类型
  • W:窗口的类型,是 Window 的子类型。一般情况下我们定义时间窗口,W就是 TimeWindow

定义方法如下:

process(窗口处理函数不是逐个处理数据)

  • key:窗口做统计计算基于的键,也就是之前 keyBy 用来分区的字段
  • context:当前窗口进行计算的上下文
  • elements:窗口收集到用来计算的所有数据,这是一个可迭代的集合类型
  • out:用来发送数据输出计算结果的收集器,类型为 Collector

可以明显看出,这里的参数不再是一个输入数据,而是窗口中所有数据的集合。而上下文context 所包含的内容也跟其他处理函数有所差别:

①不再提供设置定时器的方法

②由于当前不是只处理一个数据,所以也不再提供.timestamp()方法

③可以通过.window()直接获取到当前的窗口对象

④可以通过.windowState().globalState()获取到当前自定义的窗口状态和全局状态

clear()

进行窗口的清理工作:如果我们自定义了窗口状态,那么必须在.clear()方法中进行显式地清除,避免内存溢出

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/419048.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是中间件?

文章目录 为什么需要中间件&#xff1f;中间件生态漫谈数据库中间件读写分离分库分表引进数据库中间件MyCat 服务端代理模式ShardingJDBC 客户端代理模式 总结 IT 系统从单体应用逐渐向分布式架构演变&#xff0c;高并发、高可用、高性能、分布式等话题变得异常火热&#xff0c…

第十三章 MySQL

第十三章 MySQL 下面是创建数据库操作 删除数据库 右上角选择要操作的数据库 如果关闭了这个控制台&#xff0c;下次如何找到它呢 也可以对其改名

基于YOLOv8的学生课堂行为检测,引入BRA注意力和Shape IoU改进提升检测能力

&#x1f4a1;&#x1f4a1;&#x1f4a1;本文摘要&#xff1a;介绍了学生课堂行为检测&#xff0c;并使用YOLOv8进行训练模型&#xff0c;以及引入BRA注意力和最新的Shape IoU提升检测能力 1.SCB介绍 摘要&#xff1a;利用深度学习方法自动检测学生的课堂行为是分析学生课堂表…

IDEA在重启springboot项目时没有自动重新build

IDEA在重启springboot项目时没有自动重新build 问题描述 当项目里面某些依赖或者插件更新了&#xff0c;target的class文件没有找到&#xff0c;导致不是我们需要的效果。 只能手动的清理target文件&#xff0c;麻烦得很 &#xff0c; 单体项目还好说&#xff0c;一次清理就…

如何系统地自学 Python?【附:Python基础入门教程】

如何系统地自学 Python&#xff1f;【附&#xff1a;Python基础入门教程】 一、确定学习目标 在开始学习Python之前&#xff0c;首先需要明确自己的学习目标。是为了入门编程、转行程序员、提升编程能力&#xff0c;还是为了解决特定领域的问题&#xff1f;只有明确了学习目标&…

SpringBoot:详解Bean生命周期和作用域

&#x1f3e1;浩泽学编程&#xff1a;个人主页 &#x1f525; 推荐专栏&#xff1a;《深入浅出SpringBoot》《java项目分享》 《RabbitMQ》《Spring》《SpringMVC》 &#x1f6f8;学无止境&#xff0c;不骄不躁&#xff0c;知行合一 文章目录 前言一、生命周期二…

react中数据不可变

先看官网 一、不可变数据的概念 不可变数据意味着数据一旦创建&#xff0c;就不能被更改。在React中&#xff0c;每次对数据的修改都会返回一个新的数据副本&#xff0c;而不会改变原始数据。这种方式确保了数据的稳定性和一致性。 二、Props中的不可变数据 在React中&#xf…

机器视觉技术与应用实战(平均、高斯、水平prewitt、垂直prewitt、水平Sobel、垂直Sobel、拉普拉斯算子、锐化、中值滤波)

扯一点题外话&#xff0c;这一个月经历了太多&#xff0c;接连感染了甲流、乙流&#xff0c;人都快烧没了&#xff0c;乙流最为严重&#xff0c;烧了一个星期的38-39度&#xff0c;咳嗽咳到虚脱。还是需要保护好身体&#xff0c;感觉身体扛不住几次连续发烧&#xff01;&#x…

gin中间件篇

1. 全局中间件 所有请求都经过此中间件 package mainimport ("fmt""time""github.com/gin-gonic/gin" )// 定义中间 func MiddleWare() gin.HandlerFunc {return func(c *gin.Context) {t : time.Now()fmt.Println("中间件开始执行了&quo…

设计模式-资源库模式

设计模式专栏 模式介绍模式特点应用场景资源库模式与关系型数据库的区别代码示例Java实现资源库模式Python实现资源库模式 资源库模式在spring中的应用 模式介绍 资源库模式是一种架构模式&#xff0c;介于领域层与数据映射层&#xff08;数据访问层&#xff09;之间。它的存在…

Linux中关于head命令详解

head的作用 head用于查看文件的开头部分的内容。 head的参数 -q隐藏文件名-v 显示文件名-c<数目>显示的字节数-n<数目>显示的行数 head的案例 # 查看yum.log前五行内容 head -5 yum.log

Datawhale 强化学习笔记(三)基于策略梯度(policy-based)的算法

文章目录 参考基于价值函数的缺点策略梯度算法REINFORCE 算法策略梯度推导进阶策略函数的设计离散动作的策略函数连续动作的策略函数 参考 第九章 策略梯度 之前介绍的 DQN 算法属于基于价值(value-based)的算法&#xff0c;基于策略梯度的算法直接对策略本身进行优化。 将策…