【flink状态管理(2)各状态初始化入口】状态初始化流程详解与源码剖析

文章目录

    • 1. 状态初始化总流程梳理
    • 2.创建StreamOperatorStateContext
    • 3. StateInitializationContext的接口设计。
    • 4. 状态初始化举例:UDF状态初始化

在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在invoke()方法中会调用restoreInternal()方法,这中间包括创建和初始化算子中的状态数据。
另外在invoke中,可以通过判断任务状态来判断是否需要初始化状态。

        // Allow invoking method 'invoke' without having to call 'restore' before it.if (!isRunning) {LOG.debug("Restoring during invoke will be called.");restoreInternal();}

StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。

RegularOperatorChain.
public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {  Iterator var2 = this.getAllOperators(true).iterator();  while(var2.hasNext()) {  StreamOperatorWrapper<?, ?> operatorWrapper = (StreamOperatorWrapper)var2.next();  StreamOperator<?> operator = operatorWrapper.getStreamOperator();  operator.initializeState(streamTaskStateInitializer);  operator.open();  }  }

 
找到了算子状态初始化的位置,我们继续了解状态是如何初始化的。

1. 状态初始化总流程梳理

AbstractStreamOperator.initializeState中描述了状态初始化的总体流程,如下代码以及注释:

# AbstractStreamOperator.initializeStatepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager)  throws Exception {  //1. 获取类型序列化器final TypeSerializer<?> keySerializer =  config.getStateKeySerializer(getUserCodeClassloader());  //2. get containingTaskfinal StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());  final CloseableRegistry streamTaskCloseableRegistry =  Preconditions.checkNotNull(containingTask.getCancelables());  //3. create StreamOperatorStateContextfinal StreamOperatorStateContext context =  streamTaskStateManager.streamOperatorStateContext(  getOperatorID(),  getClass().getSimpleName(),  getProcessingTimeService(),  this,  keySerializer,  streamTaskCloseableRegistry,  metrics,  config.getManagedMemoryFractionOperatorUseCaseOfSlot(  ManagedMemoryUseCase.STATE_BACKEND,  runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),  runtimeContext.getUserCodeClassLoader()),  isUsingCustomRawKeyedState());  //4. create stateHandlerstateHandler =  new StreamOperatorStateHandler(  context, getExecutionConfig(), streamTaskCloseableRegistry);  timeServiceManager = context.internalTimerServiceManager();  //5. initialize OperatorStatestateHandler.initializeOperatorState(this);  //6. set KeyedStateStore in runtimeContextruntimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));  
}

在StreamOperator初始化状态数据的过程中,首先从StreamTask中获取创建状态需要的组件,例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。

状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用,例如在AbstractUdfStreamOperator中,就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。

2.创建StreamOperatorStateContext

接下来看如何在Task实例初始化时创建这些组件,并将其存储在StreamOperatorStateContext中供算子使用,如下代码:

StreamTaskStateInitializerImpl
@Override  
public StreamOperatorStateContext streamOperatorStateContext(  @Nonnull OperatorID operatorID,  @Nonnull String operatorClassName,  @Nonnull ProcessingTimeService processingTimeService,  @Nonnull KeyContext keyContext,  @Nullable TypeSerializer<?> keySerializer,  @Nonnull CloseableRegistry streamTaskCloseableRegistry,  @Nonnull MetricGroup metricGroup,  double managedMemoryFraction,  boolean isUsingCustomRawKeyedState)  throws Exception {  //1. 获取task实例信息TaskInfo taskInfo = environment.getTaskInfo();  OperatorSubtaskDescriptionText operatorSubtaskDescription =  new OperatorSubtaskDescriptionText(  operatorID,  operatorClassName,  taskInfo.getIndexOfThisSubtask(),  taskInfo.getNumberOfParallelSubtasks());  final String operatorIdentifierText = operatorSubtaskDescription.toString();  final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =  taskStateManager.prioritizedOperatorState(operatorID);  CheckpointableKeyedStateBackend<?> keyedStatedBackend = null;  OperatorStateBackend operatorStateBackend = null;  CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;  CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;  InternalTimeServiceManager<?> timeServiceManager;  try {  // 创建keyed类型的状态后端// -------------- Keyed State Backend --------------  keyedStatedBackend =  keyedStatedBackend(  keySerializer,  operatorIdentifierText,  prioritizedOperatorSubtaskStates,  streamTaskCloseableRegistry,  metricGroup,  managedMemoryFraction);  //创建operator类型的状态后端// -------------- Operator State Backend --------------  operatorStateBackend =  operatorStateBackend(  operatorIdentifierText,  prioritizedOperatorSubtaskStates,  streamTaskCloseableRegistry);  //创建原生类型状态后端// -------------- Raw State Streams --------------  rawKeyedStateInputs =  rawKeyedStateInputs(  prioritizedOperatorSubtaskStates  .getPrioritizedRawKeyedState()  .iterator());  streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);  rawOperatorStateInputs =  rawOperatorStateInputs(  prioritizedOperatorSubtaskStates  .getPrioritizedRawOperatorState()  .iterator());  streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);  //创建Internal Timer Service Manager// -------------- Internal Timer Service Manager --------------  if (keyedStatedBackend != null) {  // if the operator indicates that it is using custom raw keyed state,  // then whatever was written in the raw keyed state snapshot was NOT written            // by the internal timer services (because there is only ever one user of raw keyed            // state);            // in this case, timers should not attempt to restore timers from the raw keyed            // state.            final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =  (prioritizedOperatorSubtaskStates.isRestored()  && !isUsingCustomRawKeyedState)  ? rawKeyedStateInputs  : Collections.emptyList();  timeServiceManager =  timeServiceManagerProvider.create(  keyedStatedBackend,  environment.getUserCodeClassLoader().asClassLoader(),  keyContext,  processingTimeService,  restoredRawKeyedStateTimers);  } else {  timeServiceManager = null;  }  // -------------- Preparing return value --------------  return new StreamOperatorStateContextImpl(  prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),  operatorStateBackend,  keyedStatedBackend,  timeServiceManager,  rawOperatorStateInputs,  rawKeyedStateInputs);  } catch (Exception ex) {  。。。。
}

流程梳理:

  1. 从environment中获取TaskInfo,并基于Task实例创建OperatorSubtaskDescriptionText。Operator中Task实例的描述信息包含OperatorID、OperatorClassName等,最终用于创建OperatorStateBackend的状态存储后端。
  2. 创建KeyedStateBackend,KeyedStateBackend是KeyedState的状态管理后端,提供创建和管理KeyedState的方法。
  3. 创建OperatorStateBackend,OperatorStateBackend是OperatorState的状态管理后端,提供获取和管理OperatorState的接口。
  4. 创建KeyGroupStatePartitionStreamProvider实例,提供创建和获取原生KeyedState的方法。
  5. 创建StatePartitionStreamProvider实例,提供创建和获取原生OperatorState的方法。
  6. 将所有创建出来的托管状态管理后端keyedStatedBackend和operatorStateBackend、原生状态存储后端rawKeyedStateInputs和rawOperatorStateInputs及timeServiceManager实例,全部封装在StreamOperatorStateContextImpl上下文对象中,并返回给AbstractStreamOperator使用。

 
小结
StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件并使用它们创建指定类型的状态,最终状态数据会存储在状态管理后端指定的物理介质上,例如堆内存或RocksDB。

StateInitializationContext会被用于算子和UserDefinedFunction中,实现算子或函数中的状态数据操作。

 

3. StateInitializationContext的接口设计。

StateInitializationContext接口同时继承了ManagedInitializationContext接口和FunctionInitializationContext接口。StateInitializationContext接口的默认实现类为StateInitializationContextImpl。
在这里插入图片描述

  1. ManagedInitializationContext接口提供了托管状态使用的KeyedStateStore和OperatorStateStore获取方法,即KeyedStateBackend和OperatorStateBackend的封装类。算子进行初始化时,会通过KeyedStateStore和OperatorStateStore提供的方法创建和管理指定类型的托管状态。

  2. FunctionInitializationContext提供了用户自定义函数状态数据初始化需要的方法。它和ManagedInitializationContext保持一致,这主要是为了和算子使用的上下文进行区分,但两者的操作基本一致。

  3. StateInitializationContext提供了对托管状态数据的管理,并在内部继承和拓展了获取及管理原生状态数据的方法,如getRawOperatorStateInputs()、getRawKeyedStateInputs()等

  4. StateInitializationContextImpl具备操作管理状态和原生状态的能力。基于它可以获取不同类型的状态管理后端,并基于状态管理操作状态数据。

在这里插入图片描述

 

4. 状态初始化举例:UDF状态初始化

在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。

AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复,实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。

public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

恢复函数内部的状态数据涉及Checkpoint的实现,我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。

 
《Flink设计与实现:核心原理与源码解析》张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/458541.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS的动画

CSS的动画 在本节&#xff0c;我们将学习keyframes动画。 1. 动画的基本使用 1. 定义动画 定义动画有两种写法&#xff1a; 简单定义方式 keyframes 动画名 {/* from代表初始状态 */from {/*property1:value1*/transform: translate(0%);}/* to代表结束状态 */to {transfor…

c#cad 创建-正方形(四)

运行环境 vs2022 c# cad2016 调试成功 一、程序说明 创建一个正方形&#xff0c;并将其添加到当前活动文档的模型空间中。 程序首先获取当前活动文档和数据库&#xff0c;并创建一个编辑器对象。 然后&#xff0c;使用事务开始创建正方形的操作。获取模型空间的块表记录&a…

【机器学习】单变量线性回归

文章目录 线性回归模型&#xff08;linear regression model&#xff09;损失/代价函数&#xff08;cost function&#xff09;——均方误差&#xff08;mean squared error&#xff09;梯度下降算法&#xff08;gradient descent algorithm&#xff09;参数&#xff08;parame…

服装设计公司,如何用钉钉实现企业数字化成功转型?

钉钉作为数字化工作平台&#xff0c;为某服装设计公司实现了组织管理的数字化转型&#xff0c;构建了一站式的工作平台。通过钉钉赋能&#xff0c;有利于企业推进组织架构、员工沟通、产品运营和客户服务等方面的数字化、智能化转型。 借助钉钉平台&#xff0c;该服设公司轻松实…

常见的 MIME(媒体)类型速查

一、简介 MIME(Multipurpose Internet Mail Extensions)多用途互联网邮件扩展类型&#xff0c;是设定某种扩展名的文件用一种应用程序来打开的方式类型&#xff0c;当该扩展名文件被访问的时候&#xff0c;浏览器会自动使用指定应用程序来打开。多用于指定一些客户端自定义的文…

使用Qt创建项目 Qt中输出内容到控制台 设置窗口大小和窗口标题 Qt查看说明文档

按windows键&#xff0c;找到Qt Creator &#xff0c;打开 一.创建带模板的项目 新建项目 设置项目路径QMainWindow是带工具栏的窗口。 QWidget是无工具栏的窗口。 QDuakig是对话框窗口。创建好的项目如下&#xff1a; #include "widget.h"// 构造函数&#xff…

VSCode开发常用扩展记录

1、Chinese 2、document this 可以自动为ts和js文件生成jsDoc注释 3、ESLint 能够查找并修复js代码中的问题 4、koroFileHeader 5、Prettier 代码格式化

numa网卡绑定

#概念 参考&#xff1a;https://www.jianshu.com/p/0f3b39a125eb(opens new window) chip&#xff1a;芯片&#xff0c;一个cpu芯片上可以包含多个cpu core&#xff0c;比如四核&#xff0c;表示一个chip里4个core。 socket&#xff1a;芯片插槽&#xff0c;颗&#xff0c;跟…

高速接口PCB布局指南(五)高速差分信号布线(三)

高速接口PCB布局指南&#xff08;五&#xff09;高速差分信号布线&#xff08;三&#xff09; 1.表面贴装器件焊盘不连续性缓解2.信号线弯曲3.高速信号建议的 PCB 叠层设计4.ESD/EMI 注意事项5.ESD/EMI 布局规则 tips&#xff1a;资料主要来自网络&#xff0c;仅供学习使用。 …

《Git 简易速速上手小册》第8章:保护你的代码(2024 最新版)

文章目录 8.1 使用 .gitignore 优化你的仓库8.1.1 基础知识讲解8.1.2 重点案例&#xff1a;为 Python 项目配置 .gitignore8.1.3 拓展案例 1&#xff1a;使用全局 .gitignore8.1.4 拓展案例 2&#xff1a;忽略已经被跟踪的文件 8.2 管理敏感数据8.2.1 基础知识讲解8.2.2 重点案…

顺序图(Sequence Diagram)

也叫时序图、序列图 一、定义 顺序图是用来描述对象自身及对象间信息传递顺序的视图。 二、要素 活动者,对象,生命线,控制焦点,消息(同步消息,异步消息,返回消息,自关联消息) 1、 活动者 活动者发出情况或者接收系统的服务。 2、 对象 对象是特定行为与属性的集合。 表…

win10系统连接WiFi,输入正确密码,但还是提示错误

情况 电信宽带 mac和小米手机都可以连上wifi dell上的windows输入正确的密码还是提示错误 解决办法 根据路由器上的终端配置进入网页进行配置&#xff0c;我的是192.168.1.1&#xff0c;账户:useradmin 修改无线网络设置中的加密方式&#xff0c;由Mixed WPA2/WPA-PSK改为W…