从Flink的Kafka消费者看算子联合列表状态的使用

背景

算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态

算子联合列表状态

首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况
在这里插入图片描述
算子联合列表状态主要由这两个方法处理:
1初始化方法

public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();// 在初始化方法中获取联合列表状态this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));if (context.isRestored()) {restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中// populate actual holder for restored statefor (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info("Consumer subtask {} restored state: {}.",getRuntimeContext().getIndexOfThisSubtask(),restoredState);} else {LOG.info("Consumer subtask {} has no restore state.",getRuntimeContext().getIndexOfThisSubtask());}}

2.开始通知检查点开始的方法:

public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :subscribedPartitionsToStartOffsets.entrySet()) {// 进行checkpoint时,把数据保存到联合列表状态中进行保存unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/139762.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为数通方向HCIP-DataCom H12-831题库(单选题:261-280)

第261题 某网络通过部署1S-IS实现全网与通,若在一台IS-IS路由器的某接口下配置命令isis timer holding multiplier 5 level-2,则以下关于该场景的描述,正确的是哪一项? A、该接口Level-2邻居保持时间为5秒 B、该接口Level-1邻居保持时间为30秒 C、该接口为点对点链路接口 …

2022年全网最全最细最流行的自动化测试工具有哪些?

一&#xff1a;前言 随着测试工程师技能和工资待遇的提升&#xff0c;甚至有一部分的开发人员开始转入测试岗位&#xff0c;跨入自动化领域的测试攻城狮越来越多。在自动化测试领域&#xff0c;自动化工具肯定占据了核心的位置。 本文总结了常用的测试自动化工具和框架&#x…

Spring framework Day 23:容器事件

前言 容器事件是 Spring Framework 中的一个重要概念&#xff0c;它提供了一种机制&#xff0c;使我们能够更好地了解和响应 Spring 容器中发生的各种事件。通过容器事件&#xff0c;我们可以在特定的时间点监听和处理容器中的各种状态变化、操作和事件触发&#xff0c;以实现…

超实用!了解github的热门趋势和star排行是必须得!

在当今的技术领域中&#xff0c;GitHub 已经成为了开发者们分享和探索代码的重要平台。作为全球最大的开源社区&#xff0c;GitHub上托管了数以亿计的项目&#xff0c;其中包括了各种各样的技术栈和应用。对于开发者来说&#xff0c;了解GitHub上的热门趋势和star排行是非常重要…

Unity3D 基础——使用 Mathf.SmoothDamp 函数制作相机的缓冲跟踪效果

使用 Mathf.SmoothDamp 函数制作相机的缓冲跟踪效果&#xff0c;让物体的移动不是那么僵硬&#xff0c;而是做减速的缓冲效果。将以下的脚本绑定在相机上&#xff0c;然后设定好 target 目标对象&#xff0c;即可看到相机的缓动效果。通过设定 smoothTime 的值&#xff0c;可以…

卷麻了,00后测试用例写的比我还好,简直无地自容......

经常看到无论是刚入职场的新人&#xff0c;还是工作了一段时间的老人&#xff0c;都会对编写测试用例感到困扰&#xff1f;例如&#xff1a; 如何编写测试用例&#xff1f; 作为一个测试新人&#xff0c;刚开始接触测试&#xff0c;对于怎么写测试用例很是头疼&#xff0c;无法…

OpenCV模板匹配实现银行卡数字识别

目录 1&#xff0c;项目流程 2&#xff0c;代码流程解读 2.1 导入工具包 2.2 设置参数 2.3 指定信用卡类型 2.4 展示图像 ​编辑 2.5 读取一个模板图像 2.6 转化为灰度图--------->再转化为二值图像 2.7 计算轮廓 ​编辑 2.8 导入我们要识别的图像&…

JavaSE入门---认识Java数组

文章目录 一. 数组的基本概念1.1 为什么要使用数组&#xff1f;1.2 什么是数组&#xff1f;1.3 数组的使用 二. 数组是引用类型三. 数组的应用场景四. 数组中的常用方法五. 二维数组 一. 数组的基本概念 1.1 为什么要使用数组&#xff1f; 想象这样的一个场景&#xff1a;期末…

【解决】运行vue项目,启动报错 in ./node_modules/@intlify/core-base/dist/core-base.cjs

我的处理方式: 一开始查了好多方法&#xff0c;删除node_modules&#xff0c;重新安装&#xff0c;切换node版本等&#xff0c;但是发现并没有用 之后来发现是安装依赖包的时候有些包安装失败导致的&#xff0c;只要有针对性的重新安装依赖就可以了 例如&#xff1a; in ./n…

会议OA小程序【首页布局】

目录 一. Flex布局介绍 1.1 什么是Flex布局 1.2 基本概念 1.3 Flex属性 二. 会议OA首页轮播图的实现 配置 Mock工具 swiper 效果展示 三. 会议OA首页会议信息布局 index.js index.wxml index.wxss 首页整体效果展示 一. Flex布局介绍 布局的传统解决方案&#x…

服务器中了locked勒索病毒怎么办,勒索病毒解密,数据恢复

最近一段时间内&#xff0c;相信很多使用金蝶或用友的办公软件的企业&#xff0c;有很多都经历了locked勒索病毒的攻击&#xff0c;导致企业服务器被加密无法正常使用&#xff0c;严重影响了企业的正常工作。通过云天数据恢复中心的解密恢复发现&#xff0c;在今年locked勒索病…

亲,手撸图文博文太累了?试试这个神器!

这一篇博客有关如何使用[InternLM-XComposer]来写图文并茂的博文。InternLM-XComposer是一个基于人工智能的创作工具&#xff0c;它可以根据你的输入生成不同类型的内容&#xff0c;例如文章、诗歌、歌词、代码等。你可以使用它来创作有趣和有创意的博客&#xff0c;同时也可以…