Checkpoint机制和生产配置

1.前提

在将Checkpoint之前,先回顾一下flink处理数据的流程:

在这里插入图片描述

2. 概述

Checkpoint机制,又叫容错机制,可以保证流式任务中,不会因为异常时等原因,造成任务异常退出。可以保证任务正常运行。
(1)能在集群异常时,保持已计算的数据,下次恢复时能在已保存数据的基础上,继续计算(类似于快照);
(2)避免数据丢失(通过Barrier实现)

3.机制运行流程

在这里插入图片描述
解释:

(1)主节点上的检查点协调器(CheckpointCoordinator)会周期性地发送一个个地Barrier(栅栏,前面说的 偏移量做标识),Barrier会混在数据里,随着数据流,流向source算子;

(2)source算子在摄入数据的时候,如果碰到Barrier栅栏,不会去处理,Barrier就会让先算子去汇报当前的状态

(3)处理完之后,Barrier就会随着数据流,流向下一个算子;

(4)下一个算子收到Barrier,同样会停下手里的工作,也会向检查点协调器汇报当前的状态,把状态往主节点传递一份(备份,防止算子出错,状态丢失)
(5)上一步处理完之后,Barrier又会随着数据流向下一个算子,以此类推。
(6)等Barrier流经所有的算子之后,这一轮的快照就算制作完成

4. 状态后端

状态后端,StateBackend,就是Flink存储状态的介质(存储状态的地方)。Flink提供了三种状态后端的存储方式:

  • MemoryStateBackend(内存,使用HashMapStateBackend实现,生产一般不用)
  • FsStateBackend(文件系统,比如说HDFS,生产常用)
  • RocksDBStateBackend(RocksDB数据库,生产常用)
  • 同时也可以把状态外置到 Hbase和Redis,解决大状态存储问题
MemoryStateBackend

内存,掉电易失。不安全。基本不用。
在这里插入图片描述
配置如下:

state.backend: hashmap
# 可选,当不指定 checkpoint 路径时,默认自动使用 JobManagerCheckpointStorage
state.checkpoint-storage: jobmanager
FsStateBackend

FsStateBackend,文件系统的状态后端,就是把状态保存在文件系统中,常用来保存状态的文件系统有HDFS;
工作中常用;
在这里插入图片描述
配置如下:

state.backend: hashmap 
state.checkpoints.dir: file:///checkpoint-dir/ # 默认为FileSystemCheckpointStorage 
state.checkpoint-storage: filesystem
RocksDBStateBackend

RocksDBStateBackend,把状态保存在RocksDB数据库中。

RocksDB,是一个小型文件系统的数据库。

配置如下:

state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

特点:可以保持巨大的状态,且支持增量状态保存。

5.重启策略

5.1 重启策略概述

Flink流式任务,需要长期运行,就算遇到一些数据异常问题等,也不能随便退出。

Flink为了让任务能够在遇到异常退出时,能够重新启动,正常运行,Flink提出了重启策略的概念。

5.2 Flink的重启策略

Flink支持四种类型的重启策略:

  • none:没有重启。任务一旦遇到异常,就退出。

  • fixed-delay:固定延迟重启策略。也就是说,可以配置一个重启的次数。超过次数后,才会退出。

  • failure-rate:失败率重启策略。也就是说,任务的失败频率。超过该频率后才退出。在设定的频率之内,不会退出。

  • exponential-delay:指数延迟重启策略。也就是说,任务在失败后,下一次的延迟时间是随着指数增长的。

5.3案例演示
模拟异常的代码
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** Flink 代码实现流处理,进行单词统计。数据源来自于socket数据。* todo 演示Flink遇到异常重启。*/
public class RestartStrategy {public static void main(String[] args) throws Exception {//1.构建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);//2.数据输入(数据源)//从socket读取数据,socket = hostname + portDataStreamSource<String> source = env.socketTextStream("node1", 9999);//3.数据处理//3.1 使用flatMap进行扁平化处理SingleOutputStreamOperator<String> flatMapStream = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(" ");for (String word : words) {if (word.equals("evil")) {//evil:恶魔,魔鬼,程序如果碰到魔鬼就退出。throw new Exception("魔鬼来了,程序退出");}out.collect(word);}}});//3.2 使用map进行转换,转换成(单词,1)SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});//3.3使用keyBy进行单词分组KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});//3.4 使用reduce(sum)进行聚合操作,sum:就是根据第一个元素(Integer)进行sum操作SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedStream.sum(1);//4.数据输出result.print();//5.启动流式任务env.execute();}
}
5.4Checkpoint配置

修改flink-conf.yaml文件

execution.checkpointing.interval: 5000
#设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE        
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: hashmap
#设置checkpoint的存储方式
state.checkpoint-storage: filesystem
#设置checkpoint的存储位置
state.checkpoints.dir: hdfs://node1:8020/checkpoints
#设置savepoint的存储位置
state.savepoints.dir: hdfs://node1:8020/checkpoints
#设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
execution.checkpointing.timeout: 600000
#设置两次checkpoint之间的最小时间间隔
execution.checkpointing.min-pause: 500
#设置并发checkpoint的数目
execution.checkpointing.max-concurrent-checkpoints: 1
#开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个
state.checkpoints.num-retained: 3
#默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动
清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。
#ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:
#RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。
#DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION#------------------------------------# 设置固定延迟策略
restart-strategy: fixed-delay
# 尝试重启次数
restart-strategy.fixed-delay.attempts: 3
# 两次连续重启的间隔时间
restart-strategy.fixed-delay.delay: 3 s
fixed-delay重启策略

提交命令:

#1.启动HDFS
#2.把jar包上传到Linux
#3.配置Flink的Checkpoint和重启策略
#4.提交任务cd $FLINK_HOMEbin/flink run -c test.RestartStrategy /root/original-gz_flinkbase-1.0-SNAPSHOT.jar
#5.在socket中数据单词
nc -lk 9999
hadoop
hive
flink
evil

运行结果:
在这里插入图片描述

6.官方推荐的配置

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/644067.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL库表占用空间排序

在进行数据库备份恢复时&#xff0c;经常会碰到耗时很长的问题。大概率是因为某些库表的占用空间太大。 以下语句按照库表占用空间大小&#xff0c;进行降序排序&#xff1a; SELECT table_schema AS Database,table_name AS Table,ROUND((data_length index_length) / 1024…

腾讯文档中目录的生成及级别设置

腾讯文档是一款可多人同时编辑的在线文档。腾讯文档支持在线Word、Excel、PPT、PDF、收集表、思维导图、流程图多种类型。 今天介绍在线Word的目录生成及级别设置。 文章目录 一、腾讯文档生成目录二、目录的级别设置 一、腾讯文档生成目录 首先需要设置好文章的标题级别 在…

日志分析简单总结

1、分析日志的目的 误报&#xff1a;不是攻击而上报成攻击 漏报&#xff1a;是攻击而没有防御的情况 日志分析可以判断是否误判或者漏判&#xff0c;可以溯源攻击行为 在护网作为防守方必备的技能&#xff08;分析NGAF和态势感知&#xff0c;发现异常&#xff09; 2、攻击出现…

计算机网络物理层思维导图+大纲笔记

大纲笔记&#xff1a; 物理层的基本概念 解决如何在连接各种计算机的传输媒体上传输数据比特流&#xff0c;而不是具体的传输媒体 主要任务 确定与传输媒体接口有关的一些特性 机械特性 电气特性 功能特性 规程特性信道上传送的信号 基带信号 来自信源的信号&#xff0c;直接表…

C#基础|属性Property之读写特性和经典总结

哈喽&#xff0c;你好&#xff0c;我是雷工。 本节学习属性特性——控制读写操作&#xff0c;以下为学习笔记。 01 只读属性 写法1&#xff1a;直接去掉set方法&#xff0c;可以在定义的时候初始化。 示例&#xff1a; public string CourseName{get&#xff1b;}“雷工笔记…

新版ONENET(2024/4/24)通过view3.0可视化保姆级教程(一学就会)附效果图

⏩ 大家好哇&#xff01;我是小光&#xff0c;想要成为系统架构师的嵌入式爱好者。 ⏩上一篇是STM32通过ESP8266连接最新版的ONENET&#xff0c;成功将数据上传之后&#xff0c;本篇文章使用ONENET的view3.0可视化对数据进行可视化做一个详细教程。 ⏩感谢你的阅读&#xff0c;…

Unity 如何制作和发布你的 Package

一、制作你的第一个 Package Unity Package 不做过多赘述&#xff0c;像 URP 本质上也是一个 Package&#xff0c;在 Unity 中可以通过菜单栏 → Window → Package manager 来管理你当前的所有 Package 本篇文章主要介绍&#xff1a;如何制作并发布属于你的 Package 1.1 Pac…

【1429】招生管理管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java 招生管理系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql5.0&…

齐护K210系列教程(五)_与其它主控通信互动

与其它主控通信互动 文章目录 与其它主控通信互动1&#xff0c;硬件的准备2&#xff0c;UNO的接收发送程序3&#xff0c;AIstart的接收发送程序4&#xff0c;课程资源 联系我们 前面我们了解了AIstart如何通过串口与计算机的交互&#xff0c;那么它是否可以跟其它的主控通过这种…

【C++】手撕list(list的模拟实现)

目录 01.节点 02.迭代器 迭代器运算符重载 03.list类 &#xff08;1&#xff09;构造与析构 &#xff08;2&#xff09;迭代器相关 &#xff08;3&#xff09;容量相关 &#xff08;4&#xff09;访问操作 &#xff08;5&#xff09;插入删除 我们在学习数据结构的时候…

面包屑新玩法,ReactRouter+Ant Design实现动态渲染

在Ant Design中,可以通过Breadcrumb组件结合react-router库实现动态生成面包屑导航。具体步骤如下: 定义路由配置数据结构 我们需要在路由配置中添加额外的面包屑相关信息,例如面包屑标题、icon等。例如: const routes [{path: /,breadcrumbName: 首页},{path: /users,brea…

《S32G3系列芯片——Boot详解》持续更新中...

总目录&#xff1a;《S32G3系列芯片——Boot详解》持续更新中... ... 一、前言二、启动时序概述&#xff08;Boot Sequence&#xff09;三、启动特性&#xff08;Boot Features&#xff09;四、启动模式&#xff08;Boot Mode&#xff09;五、《S32G3系列芯片——Boot详解》系列…