Flink Window中典型的增量聚合(ReduceFunction / AggregateFunction)

一、什么是增量聚合函数

在Flink Window中定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪,这也就是窗口函数所需要做的事情。所以在窗口分配器之后,我们还要再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。
窗口可以将数据收集起来,最基本的处理操作当然就是基于窗口内的数据进行聚合。
我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction。
在这里插入图片描述

二、ReduceFunction

源码解析

@FunctionalInterface
@Public
public interface ReduceFunction<T> extends Function, Serializable {T reduce(T var1, T var2) throws Exception;
}

实际案例
在Flink中,使用socket模拟实时的数据流DataStream,通过定义一个滚动窗口,窗口的大小为10s,按照id分区,使用reduce聚合函数实现value的累加统计

package com.flink.DataStream.WindowFunctions;import com.flink.POJOs.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class FlinkWindowReduceFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);DataStreamSource<String> streamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);// 注意这里为什么返回的是KeyedStream(建控流/分区流),而不是DataStreamKeyedStream<WaterSensor, String> keyedStream = streamSource// 使用map函数将输入的string转为一个WaterSensor类.map(new MapFunction<String, WaterSensor>() {@Overridepublic WaterSensor map(String s) throws Exception {// 这里写的很详细,如何把string转为的WaterSensor类String[] strings = s.split(",");String id = strings[0];Long ts = Long.valueOf(strings[1]);Integer vc = Integer.valueOf(strings[2]);WaterSensor waterSensor = new WaterSensor();waterSensor.setId(id);waterSensor.setTs(ts);waterSensor.setVc(vc);return waterSensor;//return new WaterSensor(strings[0],Long.valueOf(strings[1]),Integer.valueOf(strings[2])}})// 按照id做keyBy分区(提问:KeyBy是如何实现分区的?).keyBy(new KeySelector<WaterSensor, String>() {// 也可以直接使用lamda表达式更简单@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {// getId()方法就是return的waterSensor.idreturn waterSensor.getId();}});/*** 窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(WindowFunctions)* .window()方法需要传入一个窗口分配器,它指明了窗口的类型* */SingleOutputStreamOperator<WaterSensor> outputStreamOperator = keyedStream// 设置滚动窗口的大小(10秒).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 使用匿名函数实现增量聚合函数ReduceFunction.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor waterSensor1, WaterSensor waterSensor2) throws Exception {System.out.println("调用reduce方法,之前的结果:" + waterSensor1 + ",现在来的数据:" + waterSensor2);return new WaterSensor(waterSensor1.getId(), System.currentTimeMillis(), waterSensor1.getVc() + waterSensor2.getVc());}});outputStreamOperator.print();streamExecutionEnvironment.execute();}
}

启动Flink程序,启动nc,模拟输入

nc -lk 8888
# 00-10秒输入
a,11111,1
# 11-20秒输入
a,11111,2
a,22222,3
# 21-30秒输入
a,11111,4

查看控制台打印结果

WaterSensor{id='a', ts=11111, vc=1}
调用reduce方法,之前的结果:WaterSensor{id='a', ts=11111, vc=2},现在来的数据:WaterSensor{id='a', ts=22222, vc=3}
WaterSensor{id='a', ts=1702022598011, vc=5}
WaterSensor{id='a', ts=11111, vc=4}

在这里插入图片描述

三、AggregateFunction

虽然ReduceFunction 可以解决大多数归约聚合的问题,但是我们通过上述案例可以发现:这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
Flink Window API 中的 aggregate 就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个 AggregateFunction 的实现类作为参数。AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {ACC createAccumulator();ACC add(IN var1, ACC var2);OUT getResult(ACC var1);ACC merge(ACC var1, ACC var2);
}

接口中有四个方法:
1.createAccumulator()
创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
2.add()
将输入的元素添加到累加器中。
3.getResult()
从累加器中提取聚合的输出结果。
4.merge()
合并两个累加器,并将合并后的状态作为一个累加器返回。
所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/259801.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Echarts的使用 笔记

1.数据可视化前言 1.1.什么是数据可视化 数据可视化&#xff1a; 就是把数据以更加直观的方式进行呈现. 1.2.数据可视化的好处 清晰有效地传达与沟通信息更容易洞察隐藏在数据中的信息 2.ECharts的基本使用 2.1.ECharts官网 ECharts是百度公司开源的一个使用 JavaScript 实…

力扣题:公共前缀/单词-11.18

力扣题-11.18 [力扣刷题攻略] Re&#xff1a;从零开始的力扣刷题生活 力扣题1&#xff1a;14.最长公共前缀 解题思想&#xff1a;先找到最小的字符串长度&#xff0c;然后进行字符串的遍历即可 class Solution(object):def longestCommonPrefix(self, strs):""&qu…

DevOps搭建(五)-JDK安装详细步骤

1、官网下载 官方网站下载JDK&#xff0c;这里我们安装JDK8 https://docs.oracle.com/javase/8/docs/technotes/guides/install/install_overview.html 点击上图中的Java SE Downloads项目&#xff0c;也可直接点击下面链接进入&#xff1a; Java Downloads | Oracle 往下滚…

BGP综合

1、使用PreVal策略&#xff0c;确保R4通过R2到达192.168.10.0/24。 2、使用AS_Path策略&#xff0c;确保R4迪过R3到达192.168.11.0/24。 3、配置MED策略&#xff0c;确保R4通过R3到达192.168.12.0/24。 4、使用Local Preference策略&#xff0c;确保R1通过R2到达192.168.1.0…

【iOS】数据持久化(三)之SQLite3数据库

目录 数据库简介什么是SQLite&#xff1f;在Xcode引入SQLite APISQL语句的种类存储字段类型 SQLite的使用创建数据库创建表和删表数据表操作增&#xff08;插入数据INSERT&#xff09;删&#xff08;删除数据DELETE&#xff09;改&#xff08;更新数据UPDATE&#xff09;查&…

Gateway

网关的作用&#xff1a; 可以对访问的用户进行身份认证和权限校验还可以服务路由&#xff0c;负载均衡还可以进行请求限流 网关本身也是微服务的一部分&#xff0c;所以需要使用nacos进行服务注册和发现 网关路由的配置 路由id&#xff1a;路由唯一标识uri&#xff1a;路由…

[足式机器人]Part2 Dr. CAN学习笔记-数学基础Ch0-8Matlab/Simulink传递函数Transfer Function

本文仅供学习使用 本文参考&#xff1a; B站&#xff1a;DR_CAN Dr. CAN学习笔记-数学基础Ch0-8Matlab/Simulink传递函数Transfer Function L − 1 [ a 0 Y ( s ) s Y ( s ) ] L − 1 [ b 0 U ( s ) b 1 s U ( s ) ] ⇒ a 0 y ( t ) y ˙ ( t ) b 0 u ( t ) b 1 u ˙ ( t…

拆解大语言模型 RLHF 中的PPO算法

为什么大多数介绍大语言模型 RLHF 的文章&#xff0c;一讲到 PPO 算法的细节就戛然而止了呢&#xff1f;要么直接略过&#xff0c;要么就只扔出一个 PPO 的链接。然而 LLM x PPO 跟传统的 PPO 还是有些不同的呀。 其实在 ChatGPT 推出后的相当一段时间内&#xff0c;我一直在等…

el-table 表格多选(后端接口搜索分页)实现已选中的记忆功能。实现表格数据和已选数据(前端分页)动态同步更新。

实现效果&#xff1a;&#xff08;可拉代码下来看&#xff1a;vue-demo: vueDemo&#xff09; 左侧表格为点击查询调用接口查询出来的数据&#xff0c;右侧表格为左侧表格所有选择的数据&#xff0c;由前端实现分页。 两个el-table勾选数据联动更新 实现逻辑&#xff1a; el-…

javascript实现Stack(栈)数据结构

上一篇文章我们理解了List这种数据结构&#xff0c;知道了它的特点和一些使用场景&#xff0c;这篇文章我们就来看一下栈这种数据结构&#xff0c;这里的栈可不是客栈哦&#xff0c;哈哈 栈其实和List非常像&#xff0c;使用javascript实现都是基于数组来实现 尝试理解Stack …

arm平台编译so文件回顾

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、几个点二、回顾过程 1.上来就执行Makefile2.编译第三方开源库.a文件 2.1 build.sh脚本2.2 Makefile3.最终编译三、其它知识点总结 前言 提示&#xff1a;这…

AI大规模专题报告:大规模语言模型从理论到实践

今天分享的AI系列深度研究报告&#xff1a;《AI大规模专题报告&#xff1a;大规模语言模型从理论到实践》。 &#xff08;报告出品方&#xff1a;光大证券&#xff09; 报告共计&#xff1a;25页 大规模语言模型基本概念 语言是人类与其他动物最重要的区别&#xff0c;而人类…