flink sql checkpoint 调优配置

- `execution.checkpointing.interval`: 检查点之间的时间间隔(以毫秒为单位)。在此间隔内,系统将生成新的检查点

SET execution.checkpointing.interval = 6000;

- `execution.checkpointing.tolerable-failed-checkpoints`: 允许的连续失败检查点的最大数量。如果连续失败的检查点数量超过此值,作业将失败。

SET execution.checkpointing.tolerable-failed-checkpoints = 10;

- `execution.checkpointing.timeout`: 检查点的超时时间(以毫秒为单位)。如果在此时间内未完成检查点操作,作业将失败。

SET execution.checkpointing.timeout =600000;

- `execution.checkpointing.externalized-checkpoint-retention`: 外部化检查点的保留策略。`RETAIN_ON_CANCELLATION`表示在作业取消时保留外部化检查点。

SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;

- `execution.checkpointing.mode`: 检查点模式。`EXACTLY_ONCE`表示每个检查点只会在作业处理完全一次时生成。

SET execution.checkpointing.mode = EXACTLY_ONCE;

- `execution.checkpointing.unaligned`: 检查点是否对齐。如果设置为`true`,则检查点将在作业的所有任务完成之前生成。

SET execution.checkpointing.unaligned = true;

- `execution.checkpointing.max-concurrent-checkpoints`: 并发生成检查点的最大数量。在此数量的检查点生成之前,不会生成新的检查点。

SET execution.checkpointing.max-concurrent-checkpoints = 1;

- `state.checkpoints.num-retained`: 保留的检查点数量。超过此数量的检查点将被删除

SET state.checkpoints.num-retained = 3;

暂未使用

SET execution.checkpointing.interval = 6000;

SET execution.checkpointing.tolerable-failed-checkpoints = 10;

SET execution.checkpointing.timeout =600000;

SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;

SET execution.checkpointing.mode = EXACTLY_ONCE;

SET execution.checkpointing.unaligned = true;

SET execution.checkpointing.max-concurrent-checkpoints = 1;

SET state.checkpoints.num-retained = 3;

yaml 文件中 起作用的配置信息如下: 

flinkConfiguration:
    taskmanager.numberOfTaskSlots: "36"
    state.backend: rocksdb
    state.checkpoint-storage: filesystem
    state.checkpoints.num-retained: "3"
    state.backend.incremental: "true"
    state.savepoints.dir: file:///flink-data/savepoints
    state.checkpoints.dir: file:///flink-data/checkpoints
    high-availability.type: kubernetes
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory # JobManager HA
    high-availability.storageDir: file:///opt/flink/flink_recovery # JobManager HA数据保存路径
  serviceAccount: flink

登录到

kubectl -n flink exec -it session-deployment-only-taskmanager-2-1 bash

查看 cat flink-conf.yaml

blob.server.port: 6124
kubernetes.jobmanager.annotations: flinkdeployment.flink.apache.org/generation:2
state.checkpoints.num-retained: 3
kubernetes.jobmanager.replicas: 2
high-availability.type: kubernetes
high-availability.cluster-id: session-deployment-only
state.savepoints.dir: file:///flink-data/savepoints
kubernetes.taskmanager.cpu: 4.0
kubernetes.service-account: flink
kubernetes.cluster-id: session-deployment-only
state.checkpoint-storage: filesystem
high-availability.storageDir: file:///opt/flink/flink_recovery
kubernetes.internal.taskmanager.replicas: 1
kubernetes.container.image: localhost:5000/flink-sql:1.14.21
parallelism.default: 1
kubernetes.namespace: flink
taskmanager.numberOfTaskSlots: 36
kubernetes.rest-service.exposed.type: ClusterIP
high-availability.jobmanager.port: 6123
kubernetes.jobmanager.owner.reference: blockOwnerDeletion:false,apiVersion:flink.apache.org/v1beta1,kind:FlinkDeployment,uid:fadef756-d327-4f19-b1b4-181d92c659eb,name:session-deployment-only,controller:false
taskmanager.memory.process.size: 6048m
kubernetes.internal.jobmanager.entrypoint.class: org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
kubernetes.pod-template-file: /tmp/flink_op_generated_podTemplate_4716243666145995447.yaml
state.backend.incremental: true
web.cancel.enable: false
execution.target: kubernetes-session
jobmanager.memory.process.size: 1024m
taskmanager.rpc.port: 6122
kubernetes.container.image.pull-policy: IfNotPresent
internal.cluster.execution-mode: NORMAL
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
kubernetes.jobmanager.cpu: 1.0
state.backend: rocksdb
$internal.flink.version: v1_14
state.checkpoints.dir: file:///flink-data/checkpoints

 只产生3个chk文件了

一个chk-X代表了一次Checkpoint信息,里面存储Checkpoint的元数据和数据。

   taskowned: TaskManagers拥有的状态

    shared: 共享的状态

默认情况下,如果设置了Checkpoint选项,Flink只保留最近成功生成的1个Checkpoint。当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活。Flink支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置指定最多需要保存Checkpoint的个数,例如指定保留最近的10个Checkpoint(就是保留上面的10个chk-X):

    state.checkpoints.num-retained: 10

    ps1:Checkpoint目录如果删除,任务就无法指定从Checkpoint恢复了

    ps2:如果job是失败了而不是手动cancel,那么无论选择上面哪种策略,state记录都会保留下来

    ps3:使用RocksDB来作为增量checkpoint的存储,可以进行定期Lazy合并清除历史状态。

最后的yaml配置如下:

#Flink Session集群 源码请到 
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: flink
  name: session-deployment-only
spec:
  image: 192.168.1.249:16443/bigdata/flink-sql:1.14.21
  #image: localhost:5000/flink-sql:1.14.21
  flinkVersion: v1_14
  #imagePullPolicy: Never   # 镜像拉取策略,本地没有则从仓库拉取
  imagePullPolicy: IfNotPresent
  ingress:   # ingress配置,用于访问flink web页面
    template: "flink.k8s.io"
    className: "nginx"
    annotations:
      nginx.ingress.kubernetes.io/rewrite-target: "/"
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "48"
    state.backend: rocksdb
    state.checkpoint-storage: filesystem
    state.checkpoints.num-retained: "20"
    state.backend.incremental: "true"
    state.savepoints.dir: file:///opt/flink/volume/flink-sp
    state.checkpoints.dir: file:///opt/flink/volume/flink-cp
    
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory # JobManager HA
    high-availability.storageDir: file:///opt/flink/volume/flink-ha # JobManager HA数据保存路径
  serviceAccount: flink
  jobManager:
    replicas: 2
    resource:
      memory: "1024m"
      cpu: 1
  taskManager:
    replicas: 1
    resource:
      memory: "6048m"
      cpu: 4
  podTemplate:
    spec:
      hostAliases:
        - ip: "192.168.1.236"
          hostnames:
            - "sql.server"
        - ip: "192.168.1.246"
          hostnames:
            - "doris.server"
      containers:
        - name: flink-main-container
          env:
            - name: TZ
              value: Asia/Shanghai
          volumeMounts:
            - name: flink-volume #挂载checkpoint pvc
              mountPath: /opt/flink/volume
      volumes:
        - name: flink-volume
          persistentVolumeClaim:
            claimName: flink-checkpoint-pvc

pvc:

#Flinnk checkpoint 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-checkpoint-pvc  # checkpoint pvc名称
  namespace: flink   # 指定归属的名命空间
spec:
  storageClassName: nfs-client   #sc名称,更改为实际的sc名称
  accessModes:
    - ReadWriteMany   #采用ReadWriteMany的访问模式
  resources:
    requests:
      storage: 20Gi    #存储容量,根据实际需要更改

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/81881.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安全防护产品对接流程讲解

服务器被攻击了,怎么对接高防产品呢,需要提供什么? 1、配置转发规则:提供域名、IP、端口,由专业技术人员为您配置转发协议/转发端口/源站IP等转发规则,平台会分配该线路独享高防IP。 2、修改DNS解析&…

STM32F4X 定时器中断

STM32F4X 定时器中断 什么是定时器STM32F4X 定时器分类有关定时器的概念预分频(PSC)自动重装载值(ARR) STM32F4X定时器例程定时器相关函数定时器例程 什么是定时器 定时器(Timer)最基本的功能就是定时,比如定时翻转LED灯,定时向串口发送数据等。除此之外…

数据库mysql导入导出数据需要注意的问题,作者只有600~800个表的处理经验

文章目录 一、新建数据库的时候编码和排序规则尽量和原数据库完全一样二、表结构和表数据分开导入,不要同时导入Navicat导出表结构Navicat导入表结构Navicat导出表数据Navicat导入表数据 三、编码问题四、执行顺序的问题五、迁移数据大小和速度的问题六、数据库分区…

板卡设计+硬件每日学习十个知识点(44)23.8.24 (检测单元设计,接口部分设计,板卡电源输入设计,电源检测电路)

文章目录 1.检测单元介绍(使用GD32单片机)2.GD32的最小系统板3.GD32的温度监测4.GD32的电压监测和电流监测5.GD32的布线6.接口部分设计7.板卡电源输入设计8.电源检测电路 1.检测单元介绍(使用GD32单片机) 答: 首先要为…

【Unity3D赛车游戏】【四】在Unity中添加阿克曼转向,下压力,质心会让汽车更稳定

👨‍💻个人主页:元宇宙-秩沅 👨‍💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨‍💻 本文由 秩沅 原创 👨‍💻 收录于专栏:Uni…

java八股文面试[JVM]——类加载器

一、类加载器的概念 类加载器是Java虚拟机用于加载类文件的一种机制。在Java中,每个类都由类加载器加载,并在运行时被创建为一个Class对象。类加载器负责从文件系统、网络或其他来源中加载类的字节码,并将其转换为可执行的Java对象。类加载器…

vue 简单实验 v-bind 变量与html属性绑定

1.代码 <script src"https://unpkg.com/vuenext" rel"external nofollow" ></script> <div id"bind-attribute"><span v-bind:title"message">鼠标悬停几秒钟查看此处动态绑定的提示信息&#xff01;</sp…

《深度学习计算机视觉 》书籍分享(包邮送书三本)

深度学习计算机视觉介绍 随着计算机技术的发展和进步&#xff0c;计算机视觉领域得到了广泛的关注和研究。而深度学习作为一种强大的机器学习方法&#xff0c;已经成为计算机视觉领域的重要工具之一。本文将介绍深度学习在计算机视觉中的应用和取得的成果。 深度学习是一种模…

Log4Qt日志框架(1)- 引入到QT中

Log4Qt日志框架&#xff08;1&#xff09;- 引入到QT中 1 下载源码2 简介3 加入到自己的项目中3.1 使用库文件3.2 引入源文件 4 说明 1 下载源码 github&#xff1a;https://github.com/MEONMedical/Log4Qt 官方(版本较老)&#xff1a;https://sourceforge.net/projects/log4q…

【PHP】文件包含-includerequire

文章目录 文件包含意义&#xff1a;四种形式文件加载原理include和require的区别文件的加载路径文件嵌套包含 文件包含 文件包含&#xff1a;在一个PHP脚本中&#xff0c;去将另外一个文件&#xff08;PHP&#xff09;包含进来&#xff0c;去合作完成一件事情。 意义&#xf…

移动web开发rem适配布局

移动web开发rem适配布局 学习目标: 能够使用rem单位能够使用媒体查询的基本语法能够使用Less的基本语法能够使用Less中的嵌套能够使用Less中的运算能够使用2种rem适配方案 1.rem单位基础 2.媒体查询 2.1什么是媒体查询 媒体查询是css3的新语法 使用media查间&#xff0c…

动物体外受精手术VR模拟仿真培训系统保证学生及标本的安全

奶牛是养殖业主要的资源&#xff0c;因此保证奶牛的健康对养殖业的成功和可持续发展具有重要已用&#xff0c;奶牛有一些常见易发病&#xff0c;一旦处理不当&#xff0c;对奶牛业都会造成较大的经济损失&#xff0c;传统的奶牛手术培训实操难度大、风险高且花费大&#xff0c;…