Flink理论—容错之状态

Flink理论—容错之状态

在 Flink 的框架中,进行有状态的计算是 Flink 最重要的特性之一。所谓的状态,其实指的是 Flink 程序的中间计算结果。Flink 支持了不同类型的状态,并且针对状态的持久化还提供了专门的机制和状态管理器。

Flink 使用流重放检查点的组合来实现容错。检查点标记每个输入流中的特定点以及每个运算符的相应状态。通过恢复运算符的状态并从检查点点重放记录,可以从检查点恢复流数据流,

为了使该机制实现其完全保证,数据流源(例如消息队列或代理)需要能够将流倒回到定义的最近点。Apache Kafka具有这种能力,Flink 的 Kafka 连接器利用了这一点。

如果发生程序故障(由于机器、网络或软件故障),Flink 会停止分布式流数据流。然后系统重新启动算子并将其重置为最新的成功检查点。输入流重置为状态快照点。作为重新启动的并行数据流的一部分进行处理的任何记录都保证不会影响之前的检查点状态。

状态

我们在 Flink 的官方博客中找到这样一段话,可以认为这是对状态的定义:

When working with state, it might also be useful to read about Flink’s state backends. Flink provides different state backends that specify how and where state is stored. State can be located on Java’s heap or off-heap. Depending on your state backend, Flink can also manage the state for the application, meaning Flink deals with the memory management (possibly spilling to disk if necessary) to allow applications to hold very large state. State backends can be configured without changing your application logic.

这段话告诉我们,所谓的状态指的是,在流处理过程中那些需要记住的数据,而这些数据既可以包括业务数据,也可以包括元数据。Flink 本身提供了不同的状态管理器来管理状态,并且这个状态可以非常大。

有状态操作的一些示例:

  • 当应用程序搜索某些事件模式时,状态将存储迄今为止遇到的事件序列。
  • 当每分钟/小时/天聚合事件时,状态保存待处理的聚合。
  • 当通过数据点流训练机器学习模型时,状态保存模型参数的当前版本。
  • 当需要管理历史数据时,状态允许有效访问过去发生的事件。

Flink 状态分类

我们在之前的课时中提到过 KeyedStream 的概念,并且介绍过 KeyBy 这个算子的使用。在 Flink 中,根据数据集是否按照某一个 Key 进行分区,将状态分为 Keyed StateOperator State(Non-Keyed State)两种类型。

image (4).png

如上图所示,Keyed State 是经过分区后的流上状态,每个 Key 都有自己的状态,图中的八边形、圆形和三角形分别管理各自的状态,并且只有指定的 key 才能访问和更新自己对应的状态。

这里还需要说明这个 Key 是隐含的,也就是不需要我们手动去操作,我们只要定义一个描述符,然后去操作状态即可,也就是说 Key 本身也是托管的

上面那张图还是简化了很多,真实的状态是这样的,我们的一个算子实例还是托管了很多Key 的,其实为了方便理解你可将其理解为一个大的Map ,

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

对 Keyed State 的访问只能在 Keyed Stream上进行,即在键控/分区数据交换之后,并且仅限于与当前事件的键关联的值。

Keyed State 进一步组织成所谓的键组。Key Groups 是 Flink 可以重新分配 Keyed State 的原子单元;键组的数量与定义的最大并行度完全相同。在执行期间,键控运算符的每个并行实例都使用一个或多个键组的键

与 Keyed State 不同的是,Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。每个算子子任务上的数据共享自己的状态。

Operator State 的实际应用场景不如 Keyed State 多,一般来说它会被用在 Source 或 Sink 等算子上,用来保存流入数据的偏移量或对输出数据做缓存,以保证 Flink 应用的 Exactly-Once 语义。

但是有一点需要说明的是,无论是 Keyed State 还是 Operator State,Flink 的状态都是基于本地的,即每个算子子任务维护着这个算子子任务对应的状态存储,算子子任务之间的状态不能相互访问。

image (5).png

我们可以看一下 State 的类图,对于 Keyed State,Flink 提供了几种现成的数据结构供我们使用,State 主要有四种实现,分别为 ValueState、MapState、AppendingState 和 ReadOnlyBrodcastState ,其中 AppendingState 又可以细分为ReducingState、AggregatingState 和 ListState。

那么我们怎么访问这些状态呢?Flink 提供了 StateDesciptor 方法专门用来访问不同的 state,类图如下:

image (6).png

状态的使用

状态的基本使用

下面演示一下如何使用 StateDesciptor 和 ValueState,代码如下:

public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 5L), Tuple2.of(1L, 2L)).keyBy(0).flatMap(new CountWindowAverage()).printToErr();env.execute("submit job");}public static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {private transient ValueState<Tuple2<Long, Long>> sum;public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {Tuple2<Long, Long> currentSum;// 访问ValueStateif(sum.value()==null){currentSum = Tuple2.of(0L, 0L);}else {currentSum = sum.value();}// 更新currentSum.f0 += 1;// 第二个元素加1currentSum.f1 += input.f1;// 更新statesum.update(currentSum);// 如果count的值大于等于2,求知道并清空stateif (currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}}public void open(Configuration config) {ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>("average", // state的名字TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // 设置默认值StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();descriptor.enableTimeToLive(ttlConfig);sum = getRuntimeContext().getState(descriptor);}
}

在上述例子中,我们通过继承 RichFlatMapFunction 来访问 State,通过 getRuntimeContext().getState(descriptor) 来获取状态的句柄。而真正的访问和更新状态则在 Map 函数中实现。

我们这里的输出条件为,每当第一个元素的和达到二,就把第二个元素的和与第一个元素的和相除,最后输出。我们直接运行,在控制台可以看到结果:

image (7).png

状态管理配置

同样,我们对于任何状态数据还可以设置它们的过期时间。如果一个状态设置了 TTL,并且已经过期,那么我们之前保存的值就会被清理。

想要使用 TTL,我们需要首先构建一个 StateTtlConfig 配置对象;然后,可以通过传递配置在任何状态描述符中启用 TTL 功能。

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
descriptor.enableTimeToLive(ttlConfig);

image (8).png

StateTtlConfig 这个类中有一些配置需要我们注意:

image (9).png

UpdateType 表明了过期时间什么时候更新,而对于那些过期的状态,是否还能被访问则取决于 StateVisibility 的配置。

总结

主要Flink 中的状态分类和使用,并且用实际案例演示了用法;关于状态后端我们可以参考下一节

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/469369.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MPLAB V8.92 printf

Compile error “A heap is required, but has not been specified” Set printf function #if 0 //for UART1 int fputc(int ch, FILE *f) { IFS1bits.U2TXIF 0; // if (runConfig.printOn 1) { // usart_data_transmit(USART0, (uint8_t)ch); U2TXREG ch; // while (RESE…

MATLAB实现朴素贝叶斯分类

朴素贝叶斯&#xff08;Naive Bayes&#xff09;是一种基于贝叶斯定理的分类算法&#xff0c;它假设特征之间相互独立&#xff0c;从而简化了计算复杂性。该算法常用于文本分类、垃圾邮件过滤、情感分析等应用场景。 MATLAB实现鸢尾花数据集分类代码如下&#xff1a; clear lo…

代码随想录算法训练营第三十一天 |基础知识,455.分发饼干,376.摆动序列,53.最大子序和(已补充)

基础知识&#xff1a; 题目分类大纲如下&#xff1a; #算法公开课 《代码随想录》算法视频公开课(opens new window)&#xff1a;贪心算法理论基础&#xff01;(opens new window),相信结合视频再看本篇题解&#xff0c;更有助于大家对本题的理解。 #什么是贪心 贪心的本质…

一周学会Django5 Python Web开发-Django5 Hello World编写

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计14条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…

ubuntu22.04@laptop OpenCV Get Started: 009_image_thresholding

ubuntu22.04laptop OpenCV Get Started: 009_image_thresholding 1. 源由2. image_thresholding应用Demo2.1 C应用Demo2.2 Python应用Demo 3. 重点分析3.1 Binary Thresholding ( THRESH_BINARY )3.2 Inverse-Binary Thresholding ( THRESH_BINARY_INV )3.3 Truncate Threshold…

春节结束后如何收心工作?

一、春节结束后的工作准备 春节假期结束后&#xff0c;迎来了新的工作季。在开始新的工作之前&#xff0c;首先需要对即将展开的工作进行充分的准备。整理和清理工作区域&#xff0c;给自己一个干净整洁的工作环境。检查和更新工作日程&#xff0c;确保未来一段时间的工作规划…

kafka如何保证消息不丢?

概述 我们知道Kafka架构如下&#xff0c;主要由 Producer、Broker、Consumer 三部分组成。一条消息从生产到消费完成这个过程&#xff0c;可以划分三个阶段&#xff0c;生产阶段、存储阶段、消费阶段。 产阶段: 在这个阶段&#xff0c;从消息在 Producer 创建出来&#xff0c;…

【JavaEE】_JavaScript(Web API)

目录 1. DOM 1.1 DOM基本概念 1.2 DOM树 2. 选中页面元素 2.1 querySelector 2.2 querySelectorAll 3. 事件 3.1 基本概念 3.2 事件的三要素 3.3 示例 4.操作元素 4.1 获取/修改元素内容 4.2 获取/修改元素属性 4.3 获取/修改表单元素属性 4.3.1 value&#xf…

MES系统有哪些厂家?万界星空科技是您不容错过的选择

为什么要选择MES系统&#xff1f; MES系统可以帮助企业实现生产计划的有效执行和优化&#xff0c;提高生产效率和质量&#xff0c;并实现产能的最大化利用。它可以实现对生产过程的全面监控和调度&#xff0c;使企业能够及时获得生产数据和指标&#xff0c;并做出相应的决策。…

Linux操作系统基础(十三):Linux安装、卸载MySQL

文章目录 Linux安装、卸载MySQL 一、卸载系统自带的mariadb-lib 二、上传安装包并解压 三、按顺序安装 错误1: 错误2: 错误3: 错误4: 四、初始化数据库 五、目录授权&#xff0c;否则启动失败 六、启动msyql服务 七、查看msyql服务的状态 八、在/var/log/mysqld.l…

一周学会Django5 Python Web开发-项目配置settings.py文件-基本配置

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计17条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…

安卓价值2-Macrodroid在其它app下执行两步就停

Macrodroid 是一款适用于 Android 平台的自动化应用程序。它允许用户创建个性化的自动化工作流程,以简化日常任务并增强手机的功能。 但使用下来会发现一些奇怪的问题,比如在其它app处于前台状态下它执行了两步任务就停止了,但切换回macrodroid就又继续执行了,这就像是程序…