Flink的算子列表状态的使用

背景

算子的列表状态是平时比较常见的一种状态,本文通过官方的例子来看一下怎么使用算子列表状态

算子列表状态

算子列表状态支持应用的并行度扩缩容,如下所示:
在这里插入图片描述
使用方法参见官方示例,我加了几个注解:

public class BufferingSinkimplements SinkFunction<Tuple2<String, Integer>>,CheckpointedFunction {//要实现CheckpointedFunction接口private final int threshold;//算子操作状态对象--算子级别的private transient ListState<Tuple2<String, Integer>> checkpointedState;//本地变量,保存这个算子任务的本地变量--任务级别的 private List<Tuple2<String, Integer>> bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();}//invoke方法中一般都是操作本地变量bufferedElements,不会直接操作算子列表状态@Overridepublic void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {bufferedElements.add(value);if (bufferedElements.size() >= threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();for (Tuple2<String, Integer> element : bufferedElements) {// 把本地变量的值设置到算子列表状态中,算子列表状态会自动会被持久化checkpointedState.add(element);}}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));// 定义算子列表状态checkpointedState = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {// 算子列表状态的值设置到本地变量中for (Tuple2<String, Integer> element : checkpointedState.get()) {bufferedElements.add(element);}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/139299.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL的ACID和并发事务带来的问题简单总结

拓跋阿秀 ACID 原子性&#xff1a;一个事务&#xff08;transaction&#xff09;中的所有操作&#xff0c;要么全部完成&#xff0c;要么全部不完成&#xff0c;不会结束在中间某个环节。事务在执行过程中发生错误&#xff0c;会被恢复&#xff08;Rollback&#xff09;到事务…

动手实现H5仿原生app前进后退切换效果

动手实现H5仿原生app前进后退切换效果 前言 最近在优化H5页面&#xff0c;我注意到当开发完成的移动端H5页面嵌入到微信小程序或者原生app中时&#xff0c;当触发页面路由切换会与原生app看上去有点格格不入&#xff0c;因为H5页面<router-view>切换路由时是直接替换了…

【视觉大模型SAM系列】PerSAM:Personalize Segment Anything Model with One Shot

【版权声明】 本文为博主原创文章&#xff0c;未经博主允许严禁转载&#xff0c;我们会定期进行侵权检索。 更多算法总结请关注我的博客&#xff1a;https://blog.csdn.net/suiyingy&#xff0c;或”乐乐感知学堂“公众号。 本文章来自于专栏《大模型》的系列文章&#xff0c;专…

软件外包开发文档

编写软件开发文档是项目开发过程中的关键步骤&#xff0c;它有助于组织、记录和分享项目的信息和进展。以下是编写软件开发文档的一般步骤和建议&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1.文档…

千兆光模块存在哪些局限性

千兆光模块是目前使用最广泛的光模块之一&#xff0c;可以实现1Gbps的传输速度。随着科技的进步和应用场景的多样性&#xff0c;千兆光模块也因其固有的局限性而面临越来越多的挑战。以下是千兆光模块的局限性和如何克服这些局限性的讨论&#xff1a; 千兆光模块可以实现最大…

7.继承与多态 对象村的优质生活

7.1 民法亲属篇&#xff1a;继承&#xff08;inheritance&#xff09; 了解继承 在设计继承时&#xff0c;你会把共同的程序代码放在某个类中&#xff0c;然后告诉其他的类说此类是它们的父类。当某个类继承另一个类的时候&#xff0c;也就是子类继承自父类。以Java的方式说&…

【JVM面试题】JVM分代年龄为何是15次?能设置为16吗?

系列文章目录 【JVM系列】第一章 运行时数据区 【JVM面试题】第二章 从JDK7 到 JDK8, JVM为啥用元空间替换永久代&#xff1f; 【JVM面试题】第三章 JVM分代年龄为何是15次&#xff1f;能设置为16吗&#xff1f; 大家好&#xff0c;我是青花。拥有多项发明专利&#xff08;都是…

【TES720D】青翼科技基于复旦微的FMQL20S400全国产化ARM核心模块

板卡概述 TES720D是一款基于上海复旦微电子FMQL20S400的全国产化核心模块。该核心模块将复旦微的FMQL20S400&#xff08;兼容FMQL10S400&#xff09;的最小系统集成在了一个50*70mm的核心板上&#xff0c;可以作为一个核心模块&#xff0c;进行功能性扩展&#xff0c;特别是用…

CSS的美化(文字、背景) Day02

一、文字控制属性 分为&#xff1a;字体样式属性 、文本样式属性 1.1 CSS字体样式属性 1.color定义元素内文字颜色2.font-size 字号大小3 font-family 字体4 font-weight 字体粗细5.font-style 字体风格6.font 字体综合属性 1.1.1 > 文字颜色 color 属性名: color color …

海量小文件数据传输如何确保安全性

在当今的信息化社会&#xff0c;企业需要处理和传输的文件越来越多&#xff0c;越来越大。其中&#xff0c;海量小文件数据是一种特殊的数据类型&#xff0c;它由数亿级别的小文件&#xff08;通常小于1MB&#xff09;组成&#xff0c;它在图片网站、物联网设备、日志分析等场景…

【Eclipse】取消按空格自动补全,以及出现没有src的解决办法

【Eclipse】设置自动提示 教程 根据上方链接&#xff0c;我们已经知道如何设置Eclipse的自动补全功能了&#xff0c;但是有时候敲变量名的时候按空格&#xff0c;本意是操作习惯&#xff0c;不需要自动补全&#xff0c;但是它却给我们自动补全了&#xff0c;这就造成了困扰&…

Unity游戏开发中ngui和ugui区别与优缺点详解

Unity3D是一款跨平台的游戏开发引擎&#xff0c;它支持多种平台&#xff0c;包括PC、移动设备和主机。在Unity3D中&#xff0c;UI系统是游戏开发中非常重要的一部分&#xff0c;它负责游戏中的用户界面的显示和交互。 对惹&#xff0c;这里有一个游戏开发交流小组&#xff0c;…