Flink checkpoint 源码分析- Checkpoint snapshot 处理流程

背景

在上一篇博客中我们分析了代码中barrier的是如何流动改的。Flink checkpoint 源码分析- Checkpoint barrier 传递源码分析-CSDN博客

最后跟踪到了代码org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 

现在我们接着跟踪相应代码,观察是如何算子接受到了barrier是如何进行下一步代码处理的。以及了解flink应对不同的消费语义(At least once, exactly once)对于checkpoint的影响是怎样的。

代码分析

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate#handleEvent 中我们主要关注对于checkpointBarrier的处理流程。

processBarrier方法实现上就可以看出,flink barrier的处理分成两种。

在这里我们需要跟踪一下barrierHandler 是如何生成的才能知道后面所要走的流程是哪一步。

通过往上追溯barrierHandler的生成,我们跟踪到方法:org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createCheckpointBarrierHandler 从代码中我们可以看到 如果是 EXACTLY_ONCE 那么生成的就SingleCheckpointBarrierHandler, 如果checkpoint 模式是AT_LEAST_ONCE, 生成对应的handler就是CheckpointBarrierTracker。 但是从代码中,EXACTLY_ONCE似乎不是简单的new 一个SingleCheckpointBarrierHandler, 而是通过一个方法来生成。因此需要进一步的观察这个方法是如何实现的。

org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil#createBarrierHandler

这里针对checkpoint类型做了区分,主要是分为aligned checkpoint 和 unaliged checkpoint的差异。这里可以进一步观察一下这两类checkpoint之前的差异。

对比这两个方法参数的差异,发现主要就是两处处参数有差异。subTaskCheckpointCoordinator、barrierHandlerState。这两个的差异主要体现在flink 在aligned checkpoint超时时,会切换为unaligned checkpoint。这里可以先按下不表,回到最开始的处理历程。

总结一下就是如果是flink 设置了at least once是使用的是CheckpointBarrierTracker, 当flink模式为exactly once时是SingleCheckpointBarrierHandler。 当为exactly once时checkpoint 类型又可以分为是aligned checkpoint还是unaligned checkpoint。

At least once 下 barrier是如何处理的

at least once 下对于barrier的处理是在以下的方法中实现的。

org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker#processBarrier

public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws IOException {final long barrierId = receivedBarrier.getId();// fast path for single channel trackersif (totalNumberOfInputChannels == 1) {markAlignmentStartAndEnd(receivedBarrier.getTimestamp());notifyCheckpoint(receivedBarrier);return;}// general path for multiple input channelsif (LOG.isDebugEnabled()) {LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelInfo);}// find the checkpoint barrier in the queue of pending barriersCheckpointBarrierCount barrierCount = null;int pos = 0;for (CheckpointBarrierCount next : pendingCheckpoints) {if (next.checkpointId == barrierId) {barrierCount = next;break;}pos++;}if (barrierCount != null) {// add one to the count to that barrier and check for completionint numBarriersNew = barrierCount.incrementBarrierCount();if (numBarriersNew == totalNumberOfInputChannels) {// checkpoint can be triggered (or is aborted and all barriers have been seen)// first, remove this checkpoint and all all prior pending// checkpoints (which are now subsumed)for (int i = 0; i <= pos; i++) {pendingCheckpoints.pollFirst();}// notify the listenerif (!barrierCount.isAborted()) {if (LOG.isDebugEnabled()) {LOG.debug("Received all barriers for checkpoint {}", barrierId);}markAlignmentEnd();notifyCheckpoint(receivedBarrier);}}}else {// first barrier for that checkpoint ID// add it only if it is newer than the latest checkpoint.// if it is not newer than the latest checkpoint ID, then there cannot be a// successful checkpoint for that ID anywaysif (barrierId > latestPendingCheckpointID) {markAlignmentStart(receivedBarrier.getTimestamp());latestPendingCheckpointID = barrierId;pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));// make sure we do not track too many checkpointsif (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {pendingCheckpoints.pollFirst();}}}}

如果只有一个inputchannel的情况下,在收到这一个barrier的时候,就可以做snapshot.

在这个中间会经过triggerCheckpointOnBarrier 等方法, 最后实际还是调到了org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl#checkpointState ,看到这里其实这很长的链路实际是一个循环,下一个算子会生成barrier,接着传递这个barrier。

实际情况是作业并行度不唯一,一个subtask往往是有多个inputchannel. 可以继续看看是如何处理的。

这里面当收取到第一个barrier,会将这个barrier信息存在个队列中。

当收取到时非第一个barrier的时候会进行计数,当收取到的是最后一个barrier的时候就会将barrier队列中在这个barrier之前的barrier全部清除,之后就可以通知做checkpoint snapshot, 这个流程就和之前的一个信道的checkpoint流程是一致的。

总结而言:at least 类型的checkpoint是在收到最后一个barrier的时候开始做snapshot的。

Exactly once checkpoint是如何处理的

首先看这一段的代码

@Overridepublic void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {long barrierId = barrier.getId();LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);if (currentCheckpointId > barrierId|| (currentCheckpointId == barrierId && !isCheckpointPending())) {if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()) {inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);}return;}checkNewCheckpoint(barrier);checkState(currentCheckpointId == barrierId);if (numBarriersReceived++ == 0) {if (getNumOpenChannels() == 1) {markAlignmentStartAndEnd(barrier.getTimestamp());} else {markAlignmentStart(barrier.getTimestamp());}}// we must mark alignment end before calling currentState.barrierReceived which might// trigger a checkpoint with unfinished future for alignment durationif (numBarriersReceived == numOpenChannels) {if (getNumOpenChannels() > 1) {markAlignmentEnd();}}try {currentState = currentState.barrierReceived(context, channelInfo, barrier);} catch (CheckpointException e) {abortInternal(barrier.getId(), e);} catch (Exception e) {ExceptionUtils.rethrowIOException(e);}if (numBarriersReceived == numOpenChannels) {numBarriersReceived = 0;lastCancelledOrCompletedCheckpointId = currentCheckpointId;LOG.debug("{}: Received all barriers for checkpoint {}.", taskName, currentCheckpointId);resetAlignmentTimer();allBarriersReceivedFuture.complete(null);}}

这里需要关注一下currentState, 在最开始我们看了他的构造函数AlternatingWaitingForFirstBarrier, 因此可以可以看这个方法具体是现实。

这里可以看到这里会block 住收到barrier的信道,如果barrier 都收齐了,之后会检查是不是unaligned的checkpoint, 如果不是可以直接做一次checkpoint。这个checkpoint和之前的流程是一致的。

这里的下一个分支是超时转化,比如设置为30s,前30s是做aligned checkpoint, 如果30s还没有完成,就会转化为unaligned checkpoint。 当然,你如果不想有超时时间,可以直接设置为0.

如果是unaligned checkpoint, 会将channel 里面的数据也写会到远端。

这个中间会有一些状态转化,每次barrier的到达都会触发不同的状态变化。其中我们看到对于uc来说,uc的第一个barrier到达了,就会触发一次global checkpoint。org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned#barrierReceived

org.apache.flink.streaming.runtime.io.checkpointing.AlternatingCollectingBarriersUnaligned#barrierReceived

最后如果收到所有的barrier之后会finished checkpoint。状态恢复到原位。

总结一下:在exactly once的语义下,aligned checkpoint的做法是,收到一个barrier的时候会将对应的channel block住。当收到最后一个barrier的时候再做一次checkpoint。

unaligned的做法是,收到barrier的时候,第一步就会触发一次checkpoint, 之后会不断上传channel state, 当收到最后一个barrier则表示checkpoint结束。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/679784.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Babylon.js 7.0开发入门教程

Babylon.js 是一个功能强大的开源 3D 引擎&#xff0c;能够使用 JavaScript 渲染交互式 3D 和 2D 图形。它是为 Web 甚至 VR 创建游戏、演示、可视化和其他 3D 应用程序的绝佳选择。Babylon.js最新版本是7.0。 Babylon.js 是免费、开源和跨平台的&#xff0c;是 Unity 和 Unre…

BFS专题——FloodFill算法:200.岛屿数量

文章目录 题目描述算法原理代码实现CJava 题目描述 题目链接&#xff1a;200.岛屿数量 PS:注意题目中每座岛屿只能由水平方向和/或竖直方向上相邻的陆地连接形成。也就是说斜角是不算了&#xff0c; 例如示例二&#xff0c;是三个岛屿。 算法原理 这道题目是 DFS&#xff0…

后教培时代的新东方,正在找寻更大的教育驱动力?

近段时间&#xff0c;K12教育主要上市公司的阶段性业绩皆已出炉。从具体数据来看&#xff0c;随着时间推移&#xff0c;教培机构的转型之路已愈走愈顺。 财报显示&#xff0c;2023年12月1日-2024年2月29日&#xff0c;好未来实现营收4.3亿美元&#xff0c;同比增长59.7%&#…

网络技术-链路层可靠传输协议

可靠传输 在链路层传输中&#xff0c;可能出现的错误包括数据位出错、分组丢失、分组失序、分组重复等。可靠传输服务希望实现发送端发送什么&#xff0c;接收端就接收到什么。虽然下面将在链路层这一章节中介绍SW、GBN、SR三种协议&#xff0c;但要明确的是&#xff0c;可靠传…

PostgreSQL的学习心得和知识总结(一百四十一)|深入理解PostgreSQL数据库数据库角色的使用及预定义角色的原理

目录结构 注&#xff1a;提前言明 本文借鉴了以下博主、书籍或网站的内容&#xff0c;其列表如下&#xff1a; 1、参考书籍&#xff1a;《PostgreSQL数据库内核分析》 2、参考书籍&#xff1a;《数据库事务处理的艺术&#xff1a;事务管理与并发控制》 3、PostgreSQL数据库仓库…

游戏理解入门:Rust+Bracket开发一个小游戏

1. Game loop 使用game loop可以使得游戏运行更加流畅和顺滑&#xff0c;它可以&#xff1a; 初始化窗口、图形和其他资源&#xff1b;每当屏幕刷新他都会运行(通常是每秒30,60 )&#xff1b;每次通过循环&#xff0c;他都会调用游戏的tick()函数。 大致的原理流程如下&…

探索DeepSeek平台:新一代MoE模型的深度体验

简介 DeepSeek是一个创新的人工智能平台&#xff0c;它最近推出了其最新版本的模型——DeepSeek-V2 MoE&#xff08;Mixture of Experts&#xff09;。这个平台不仅提供了一个交互式的聊天界面&#xff0c;还提供了API接口&#xff0c;让用户可以更深入地体验和利用这一先进的…

《QT实用小工具·六十一》带动画的三角形指示箭头

1、概述 源码放在文章末尾 该项目实现了一个带动画效果的三角形指示箭头&#xff0c;项目demo演示如下所示&#xff1a; 用法 interestingindicate.h interestingindicate.cpp 放到工程中&#xff0c;直接使用即可。 注意&#xff1a;建议绝对布局&#xff0c;手动指定 wid…

网络层协议之 IP 协议

IP 协议格式 4 位版本&#xff1a;此处的取值只有两个&#xff0c;4&#xff08;IPv4&#xff09;和 6&#xff08;IPv6&#xff09;&#xff0c;即指定 IP 协议的版本。 4 位首部长度&#xff1a;描述了 IP 报头多长&#xff0c;IP 报头是变长的&#xff0c;因为报头中的选项部…

三层交换机与防火墙连通上网实验

防火墙是一种网络安全设备&#xff0c;用于监控和控制网络流量。它可以帮助防止未经授权的访问&#xff0c;保护网络免受攻击和恶意软件感染。防火墙可以根据预定义的规则过滤流量&#xff0c;例如允许或阻止特定IP地址或端口的流量。它也可以检测和阻止恶意软件、病毒和其他威…

支付时,中国网联结算与中国银联结算的区别与联系

随着电子商务和互联网支付的快速发展&#xff0c;中国的支付清算市场也呈现出前所未有的繁荣景象。在这个大背景下&#xff0c;中国网联与中国银联作为两大支付清算机构&#xff0c;各自扮演着重要的角色。本文将对两者的区别和联系进行深入探讨&#xff0c;以期对读者有更全面…

渐进淡出背景个人导航页源码(火影版)

渐进淡出背景个人导航页源码&#xff08;火影版&#xff09; 效果图部分源码领取源码下期更新预报 效果图 部分源码 <!DOCTYPE html> <html> <head> <!--小K网 www.xkwo.com --><meta charset"UTF-8"><title>火影版个人主页<…