Flink中StateBackend(工作状态)与Checkpoint(状态快照)的关系

State Backends

由 Flink 管理的 keyed state 是一种分片的键/值存储,每个 keyed state 的工作副本都保存在负责该键的 taskmanager 本地中。另外,Operator state 也保存在机器节点本地。Flink 定期获取所有状态的快照,并将这些快照复制到持久化的位置,例如分布式文件系统。

如果发生故障,Flink 可以恢复应用程序的完整状态并继续处理,就如同没有出现过异常。

Flink 管理的状态存储在 state backend 中。Flink 有两种 state backend 的实现:

  • 一种基于 RocksDB 内嵌 key/value 存储将其工作状态保存在磁盘上的,将其状态快照持久化到(分布式)文件系统;
  • 另一种基于堆的 state backend,将其工作状态保存在 Java 的堆内存中。这种基于堆的 state backend 有两种类型:
    • FsStateBackend,将其状态快照持久化到(分布式)文件系统;
    • MemoryStateBackend,它使用 JobManager 的堆保存状态快照。

在这里插入图片描述

当使用基于堆的 state backend 保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象,访问和更新涉及序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend 能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。

所有这些 state backends 都能够异步执行快照,这意味着它们可以在不妨碍正在进行的流处理的情况下执行快照。

Checkpoint

Flink 定期对每个算子的所有状态进行持久化快照,并将这些快照复制到更持久的地方,例如分布式文件系统。 如果发生故障,Flink 可以恢复应用程序的完整状态并恢复处理,就好像没有出现任何问题一样。

这些快照的存储位置是通过作业_checkpoint storage_定义的。 有两种可用检查点存储实现:一种持久保存其状态快照 到一个分布式文件系统,另一种是使用 JobManager 的堆。

在这里插入图片描述

Flink不同版本StateBackend(状态)与Checkpoint Storage(快照) 关系

在Flink1.14之前StateBackend与Checkpoint Storage 耦合在一起,但在Flink1.14之后把StateBackend与Checkpoint Storage 实现了解耦,使逻辑更加清晰。

Flink1.14之前

  • 基于 RocksDB state backend,状态快照持久化到(分布式)文件系统;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:8020/data/rocksdb/ck", true));//true: 增量checkpoint; false:全量checkpoint
env.setStateBackend(new RocksDBStateBackend("file:///data/rocksdb/ck", true));//本地文件系统
  • 基于heap state backend,状态快照持久化到(分布式)文件系统;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:8020/data/fs/ck"));//远程分布式文件系统
env.setStateBackend(new FsStateBackend("file:///data/fs/ck"));//本地文件系统
  • 基于heap state backend,使用 JobManager 的堆保存状态快照。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend());

Flink1.14之后(推荐使用)

  • 基于 RocksDB state backend,状态快照持久化到(分布式)文件系统;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));//true: 增量checkpoint; false:全量checkpoint
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:8020/data/rocksdb/ck");//远程分布式文件系统
env.getCheckpointConfig().setCheckpointStorage("file:///data/rocksdb/ck");//本地文件系统

flink-conf.yaml配置:

 state.backend: rocksdbstate.checkpoints.dir: hdfs:///checkpoints/
  • 基于heap state backend,状态快照持久化到(分布式)文件系统;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://namenode:8020/data/fs/ck");//远程分布式文件系统
env.getCheckpointConfig().setCheckpointStorage("file:///data/fs/ck");//本地文件系统

flink-conf.yaml配置:

 state.backend: hashmapstate.checkpoints.dir: hdfs:///checkpoints/
  • 基于heap state backend,使用 JobManager 的堆保存状态快照。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());

flink-conf.yaml配置:

 state.backend: hashmapstate.checkpoint-storage: jobmanager

总结

  • 只有基于 RocksDB state backend的状态快照才支持增量checkpoint,基于heap的并不支持
  • 默认情况下 checkpoint 是禁用的,需要手动开启:

    env.enableCheckpointing(long interval, CheckpointingMode mode)

  • Flink状态分为Keyed State和非keyed State:
    • Keyed State,可以使用RocksDB state backend和heap state backend。 所有支持的状态类型如下所示:

      • ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。

      • ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

      • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

      • AggregatingState<IN, OUT>: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

      • MapState<UK, UV>: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

    • 非keyed State,不使用 RocksDB state backend,需要保存在内存中,包括:

      • 算子状态 (Operator State);
      • 广播状态 (Broadcast State),尤其需要考虑保证充足的内存;
      • 自定义 Operator State:CheckpointedFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

        void snapshotState(FunctionSnapshotContext context) throws Exception;
        void initializeState(FunctionInitializationContext context) throws Exception;

参考:

Fault Tolerance via State Snapshots

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/441254.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Adobe Photoshop 2024 v25.4.0 - 专业的图片设计软件

Adobe Photoshop 2024 v25.4.0更新了&#xff0c;从照片编辑和合成到数字绘画、动画和图形设计&#xff0c;任何您能想象到的内容都能通过PS2024轻松实现。 利用人工智能技术进行快速编辑。学习新技能并与社区分享您的工作。借助我们的最新版本&#xff0c;做令人惊叹的事情从未…

持续集成 CI/CD

CI和CD代表持续集成和持续交付/持续部署。简而言之&#xff0c;CI 是一种现代软件开发实践&#xff0c;其中频繁且可靠地进行增量代码更改。由 CI 触发的自动构建和测试步骤确保合并到存储库中的代码更改是可靠的。然后&#xff0c;作为 CD 流程的一部分&#xff0c;快速、无缝…

Vue打包Webpack源码及物理路径泄漏问题解决

修复前&#xff1a; 找到vue.config.js文件&#xff0c;在其中增加配置 module.exports {productionSourceMap: false,// webpack 配置configureWebpack: {devtool: false,}}修复后&#xff1a;

Docker核心教程

1. 概述 官网&#xff1a;https://docs.docker.com/ Docker Hub 网站&#xff1a;https://hub.docker.com/ 容器较为官方的解释&#xff1a; 一句话概括容器&#xff1a;容器就是将软件打包成标准化单元&#xff0c;以用于开发、交付和部署。 容器镜像是轻量的、可执行的独立…

大数据-Spark-关于Json数据格式的数据的处理与练习

上一篇&#xff1a; 大数据-MapReduce-关于Json数据格式的数据的处理与练习-CSDN博客 16.7 Json在Spark中的引用 依旧利用上篇的数据去获取每部电影的平均分 {"mid":1,"rate":6,"uid":"u001","ts":15632433243} {"m…

【Linux】命名管道

文章目录 命名管道一、命名管道的原理二、命名管道的创建命令行中创建程序中创建 - mkfifo函数&#xff1a; 三、命名管道的使用命名管道实现server&client通信 四、匿名管道与命名管道的区别和联系 命名管道 如果涉及到在文件系统中创建一个有名的管道&#xff0c;那么就…

HarmonyOS模拟器启动失败,电脑蓝屏解决办法

1、在Tool->Device Manager管理界面中&#xff0c;通过Wipe User Data清理模拟器用户数据&#xff0c;然后重启模拟器&#xff1b;如果该方法无效&#xff0c;需要Delete删除已创建的Local Emulater。 2、在Tool->SDK Manager管理界面的PlatForm选项卡中&#xff0c;取消…

C++11—— lambda表达式与包装器

C11—— lambda表达式与包装器 文章目录 C11—— lambda表达式与包装器一、 lambda表达式lambda表达式产生的意义lambda表达式语法函数对象与lambda表达式 二、 包装器functionfunction产生的意义function的用法function使用的例子 bind调整参数顺序固定绑定参数 一、 lambda表…

Redis常用数据结构与应用场景

常用数据结构 StringHashListSetZset String常用操作 String应用场景 Hash常用操作 hash应用场景 Hash结构优缺点 优点 同类数据归类整合存储,方便数据管理相比String操作消耗内存与spu更小相比string更节省空间 缺点 过期功能不能使用在field上,只用用在key上Redis集群…

TypeScript实战系列之合理运用类型

目录 介绍any 和 unknownerve 的用途断言type 和 interfacedeclare 关键字的作用联合类型 和 类型守卫交叉类型 介绍 这篇主要介绍下ts 常用的基本类型和一些常用的技巧性技能 any 和 unknow any 和 unknown 是两个类型关键字&#xff0c;它们用于处理类型不确定或未知的情况…

yolov8数据标注、模型训练到模型部署全过程

文章目录 一、数据标注&#xff08;x-anylabeling&#xff09;1. 安装方式1.1 直接通过Releases安装1.2 clone源码后采用终端运行 2. 如何使用 二、模型训练三、模型部署3.1 onnx转engine3.2 c调用engine模型3.2.1 main_tensorRT.cpp3.2.2 segmentationModel.cpp 一、数据标注&…

爱可声助听器参与南湖区价值百万公益助残捐赠活动成功举行

“声音大小合适吗&#xff1f;能听清楚吗&#xff1f;”今天下午&#xff0c;一场助残捐赠活动在南湖区凤桥镇悄然举行&#xff0c;杭州爱听科技有限公司带着验配团队和听力检测设备来到活动现场&#xff0c;为南湖区听障残疾人和老人适配助听器。 家住余新镇的75岁的周奶奶身体…

1.迭代与递归 - JS

迭代与递归是函数进阶的第一个门槛。迭代就是对已知变量反复赋值变换&#xff1b;递归就是函数体内调用自身。 迭代 一个迭代是就是一个循环&#xff0c;根据迭代式对变量反复赋值。 求近似根&#xff08;切线法&#xff09;&#xff1b; 迭代描述&#xff1a; x 0 x_0 x0…

C语言KR圣经笔记 6.6 表查询 6.7 typedef

6.6 表查询 为了说明结构体的更多方面&#xff0c;本节我们来写一个表查询功能包的内部代码。在宏处理器或编译器的符号表管理例程中&#xff0c;这个代码是很典型的。例如&#xff0c;考虑 #define 语句&#xff0c;当遇到如下行 #define IN 1 时&#xff0c;名称 IN 与其对…

微信小程序如何实现点击上传图片功能

如下所示,实际需求中常常存在需要点击上传图片的功能,上传前显示边框表面图片显示大小,上传后将图形缩放到边框大小。 实现如下: .wxml <view class="{{img_src==?blank-area:}}" style="width:100%;height:40%;display:flex;align-items: center;jus…

spring框架(一)

1、Spring框架&#xff1a;IoC和AOP 服务端三层开发&#xff1a;表现层、业务层、持久层 ssm, springboot, springcloud(微服务&#xff0c;治理组件) Spring框架是一个流行的Java应用程序框架&#xff0c;它提供了许多功能来简化企业级应用程序的开发。其中&#xff0c;控制反…

Selenium 隐藏浏览器指纹特征的几种方式

我们使用 Selenium 对网页进行爬虫时&#xff0c;如果不做任何处理直接进行爬取&#xff0c;会导致很多特征是暴露的 对一些做了反爬的网站&#xff0c;做了特征检测&#xff0c;用来阻止一些恶意爬虫 本篇文章将介绍几种常用的隐藏浏览器指纹特征的方式 1. 直接爬取 目标对…

面试题 02.07. 链表相交(力扣LeetCode)

文章目录 面试题 02.07. 链表相交题目描述解题思路c代码优化后c代码 面试题 02.07. 链表相交 题目描述 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表没有交点&#xff0c;返回 null 。 图示两个链表在节点 c1 …

shell

目录 一.运行方式 二.编程习惯 三.变量 3.1变量的命名 3.3普通变量(局部变量) 3.4特殊变量 3.5变量子串 3.6变量赋值 四.运算方式 4.1$(( )) 4.2let 4.3expr 4.4bc(小数运算) 4.5$[ ] 4.6awk 4.7总结运算方式 五.条件测试语句 5.1文件 5.2条件测试表达式…

IDEA:git 回滚本地提交-git 选择 Reset Current Branch to

前言 回滚提交到本地但是还没有 Push 上去的提交 选择我们要回滚的节点&#xff0c;然后点击 git 选择 Reset Current Branch to… 再选择 Hard 。当我们点击 Reset 的时候&#xff0c;代码就会回滚到单前选中的这个版本