AggregateFunction结合自定义触发器实现点击率计算

背景:

接上一篇文章,ProcessWindowFunction 结合自定义触发器会有状态过大的问题,本文就使用AggregateFunction结合自定义触发器来实现,这样就不会导致状态过大的问题了

AggregateFunction结合自定义触发器实现

在这里插入图片描述
flink对于每个窗口只需要维护一个状态:不像ProcessWindowFunction那样需要把窗口内收到的所有消息都作为状态存储起来

在这里插入图片描述
完整代码参见:

package wikiedits.func;import java.text.SimpleDateFormat;
import java.util.Date;import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;public class AggregateFunctionAndTiggerDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);env.setStateBackend(new FsStateBackend("file:///D:/tmp/flink/checkpoint/aggregatetrigger"));// 并行度为1env.setParallelism(1);// 设置数据源,一共三个元素DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {@Overridepublic void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {int xxxNum = 0;int yyyNum = 0;for (int i = 1; i < Integer.MAX_VALUE; i++) {// 只有XXX和YYY两种nameString name = (0 == i % 2) ? "XXX" : "YYY";// 更新aaa和bbb元素的总数if (0 == i % 2) {xxxNum++;} else {yyyNum++;}// 使用当前时间作为时间戳long timeStamp = System.currentTimeMillis();// 将数据和时间戳打印出来,用来验证数据if (xxxNum % 2000 == 0) {System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n", name,time(timeStamp), xxxNum, yyyNum));}// 发射一个元素,并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1);}}@Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunctionSingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream = dataStream// 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种.keyBy(value -> value.f0)// 5秒一次的滚动窗口.timeWindow(Time.minutes(5))// 10s触发一次计算,更新统计结果.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))// 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子.aggregate(new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {// 1、初始值// 定义累加器初始值@Overridepublic Tuple2<String, Integer> createAccumulator() {return new Tuple2<String, Integer>("", 0);}// 2、累加// 定义累加器如何基于输入数据进行累加@Overridepublic Tuple2<String, Integer> add(Tuple2<String, Integer> value,Tuple2<String, Integer> accumulator) {accumulator.f0 = value.f0;accumulator.f1 += value.f1;return accumulator;}// 3、合并// 定义累加器如何和State中的累加器进行合并@Overridepublic Tuple2<String, Integer> merge(Tuple2<String, Integer> acc1,Tuple2<String, Integer> acc2) {acc1.f1 += acc2.f1;return acc1;}// 4、输出// 定义如何输出数据@Overridepublic Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {return accumulator;}});// 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute("processfunction demo : processwindowfunction");}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}}

通过这种方式我们就可以做到统计某个页面一天内至今为止的点击率,每10s输出一次点击率的结果,并且不会引起状态膨胀的问题

参考文献:
https://www.cnblogs.com/Springmoon-venn/p/13667023.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/106030.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何实现MongoDB数据的快速迁移?

作为一种Schema Free文档数据库&#xff0c;MongoDB因其灵活的数据模型&#xff0c;支撑业务快速迭代研发&#xff0c;广受开发者欢迎并被广泛使用。在企业使用MongoDB承载应用的过程中&#xff0c;会因为业务上云/跨云/下云/跨机房迁移/跨地域迁移、或数据库版本升级、数据库整…

【DockerCE】Docker-CE 24.0.6正式版发布

官网下载地址&#xff08;For RHEL/CentOS 7.9&#xff09;&#xff1a; https://download.docker.com/linux/centos/7/x86_64/stable/Packages/ 相对于24.0.5版本&#xff0c;本次24.0.6版本更新的rpm包有 5 个&#xff0c;使用目录对比软件对比的结果如下&#xff1a; 在Lin…

(Note)中文EI检索期刊目录

ei和sci、ssci一样是国际知名的期刊数据库&#xff0c;ei不仅收录国际知名的刊物&#xff0c;也收录了一些国内期刊&#xff0c;为方便投稿选刊&#xff0c;Elsevier官网更新了的EI Compendex期刊目录&#xff0c;那么 国内ei期刊有哪些? 经查询共有250余种期刊&#xff0c;新…

什么是Docker和Docker-Compose?

Docker的构成 Docker仓库&#xff1a;https://hub.docker.com Docker自身组件 Docker Client&#xff1a;Docker的客户端 Docker Server&#xff1a;Docker daemon的主要组成部分&#xff0c;接受用户通过Docker Client发出的请求&#xff0c;并按照相应的路由规则实现路由分发…

Qt包含文件不存在问题解决 QNetworkAccessManager

这里用到了Qt的网络模块&#xff0c;在.pro中添加了 QT network 但是添加 #include <QNetworkAccessManager> 会报错说找不到&#xff0c;可以通过在项目上右键执行qmake后&#xff0c;直接#include <QNetworkAccessManager>就不会报错了&#xff1a;

20230911 Shell指令数组以及函数值传递,值返回

实现一个对数组求和的函数&#xff0c;数组通过实参传递给函数 #!/bin/bashfunction fun() {sum0for ((i0;i<$var;i))do(( sumarr[i] ))doneecho $sum } read -p "输入该数组个数: " var for((j0;j<$var;j)) doread -p "输入数组第$j个值: " arr[j] …

【ARM CoreLink 系列 2 -- CCI-400 控制器简介】

文章目录 CCI-400 介绍DVM 机制介绍DVM 消息传输过程TOKEN 机制介绍 下篇文章&#xff1a;ARM CoreLink 系列 3 – CCI-550 控制器介绍 CCI-400 介绍 CCI&#xff08;Cache Coherent Interconnect&#xff09;是ARM 中 的Cache一致性控制器。 CCI-400 将 Interconnect 和coh…

Ubuntu下Python3与Python2相互切换

参考文章&#xff1a;https://blog.csdn.net/Nicolas_shen/article/details/124144931 设置优先级 sudo update-alternatives --install /usr/bin/python python /usr/bin/python2 100 sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 200

npm版本升级报错

解决方法&#xff1a; 执行npm install --legacy-peer-deps依赖对等 npm install xxx --legacy-peer-deps命令用于绕过peerDependency里依赖的自动安装&#xff1b;它告诉npm忽略项目中引入的各个依赖模块之间依赖相同但版本不同的问题&#xff0c;以npm v4-v6的方式去继续执行…

技术解码 | GB28181/SIP/SDP 协议--EasyGBS国标GB28181平台国标视频技术SIP解析

EasyGBS国标视频云服务是基于国标GB/T28181协议的视频能力平台&#xff0c;可实现的视频功能包括&#xff1a;实时监控直播、录像、检索与回看、语音对讲、云存储、告警、平台级联等功能。平台部署简单、可拓展性强&#xff0c;支持将接入的视频流进行全终端、全平台分发&#…

分类预测 | MATLAB实现PCA-GRU(主成分门控循环单元)分类预测

分类预测 | MATLAB实现PCA-GRU(主成分门控循环单元)分类预测 目录 分类预测 | MATLAB实现PCA-GRU(主成分门控循环单元)分类预测预测效果基本介绍程序设计参考资料致谢 预测效果 基本介绍 Matlab实现基于PCA-GRU主成分分析-门控循环单元多输入分类预测&#xff08;完整程序和数据…

正则表达式使用总结

一、字符匹配 普通字符&#xff1a;普通字符按照字面意义进行匹配&#xff0c;例如匹配字母 "a" 将匹配到文本中的 "a" 字符。 元字符&#xff1a;元字符具有特殊的含义&#xff0c;例如 \d 匹配任意数字字符&#xff0c;\w 匹配任意字母数字字符&#xf…