【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析

文章目录

    • 1. 状态初始化总流程梳理
    • 2.创建StreamOperatorStateContext
    • 3. StateInitializationContext的接口设计。
    • 4. 状态初始化举例:UDF状态初始化

在TaskManager中启动Task线程后,会调用StreamTask.invoke()方法触发当前Task中算子的执行,在invoke()方法中会调用restoreInternal()方法,这中间包括创建和初始化算子中的状态数据。
另外在invoke中,可以通过判断任务状态来判断是否需要初始化状态。

        // Allow invoking method 'invoke' without having to call 'restore' before it.if (!isRunning) {LOG.debug("Restoring during invoke will be called.");restoreInternal();}

StreamTask调用initializeStateAndOpenOperators()方法对当前Task中所有算子的状态数据进行初始化。

RegularOperatorChain.
public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {  Iterator var2 = this.getAllOperators(true).iterator();  while(var2.hasNext()) {  StreamOperatorWrapper<?, ?> operatorWrapper = (StreamOperatorWrapper)var2.next();  StreamOperator<?> operator = operatorWrapper.getStreamOperator();  operator.initializeState(streamTaskStateInitializer);  operator.open();  }  }

 
找到了算子状态初始化的位置,我们继续了解状态是如何初始化的。

1. 状态初始化总流程梳理

AbstractStreamOperator.initializeState中描述了状态初始化的总体流程,如下代码以及注释:

# AbstractStreamOperator.initializeStatepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager)  throws Exception {  //1. 获取类型序列化器final TypeSerializer<?> keySerializer =  config.getStateKeySerializer(getUserCodeClassloader());  //2. get containingTaskfinal StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());  final CloseableRegistry streamTaskCloseableRegistry =  Preconditions.checkNotNull(containingTask.getCancelables());  //3. create StreamOperatorStateContextfinal StreamOperatorStateContext context =  streamTaskStateManager.streamOperatorStateContext(  getOperatorID(),  getClass().getSimpleName(),  getProcessingTimeService(),  this,  keySerializer,  streamTaskCloseableRegistry,  metrics,  config.getManagedMemoryFractionOperatorUseCaseOfSlot(  ManagedMemoryUseCase.STATE_BACKEND,  runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),  runtimeContext.getUserCodeClassLoader()),  isUsingCustomRawKeyedState());  //4. create stateHandlerstateHandler =  new StreamOperatorStateHandler(  context, getExecutionConfig(), streamTaskCloseableRegistry);  timeServiceManager = context.internalTimerServiceManager();  //5. initialize OperatorStatestateHandler.initializeOperatorState(this);  //6. set KeyedStateStore in runtimeContextruntimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));  
}

在StreamOperator初始化状态数据的过程中,首先从StreamTask中获取创建状态需要的组件,例如托管状态的管理后端KeyedStateBackend、OperatorStateBackend以及原生状态管理的KeyedStateInputs和OperatorStateInputs组件。

状态数据操作过程中使用的管理组件最终都会封装成StateInitializationContext并传递给子类使用,例如在AbstractUdfStreamOperator中,就会使用StateInitializationContext中的信息初始化用户定义的UDF中的状态数据。

2.创建StreamOperatorStateContext

接下来看如何在Task实例初始化时创建这些组件,并将其存储在StreamOperatorStateContext中供算子使用,如下代码:

StreamTaskStateInitializerImpl
@Override  
public StreamOperatorStateContext streamOperatorStateContext(  @Nonnull OperatorID operatorID,  @Nonnull String operatorClassName,  @Nonnull ProcessingTimeService processingTimeService,  @Nonnull KeyContext keyContext,  @Nullable TypeSerializer<?> keySerializer,  @Nonnull CloseableRegistry streamTaskCloseableRegistry,  @Nonnull MetricGroup metricGroup,  double managedMemoryFraction,  boolean isUsingCustomRawKeyedState)  throws Exception {  //1. 获取task实例信息TaskInfo taskInfo = environment.getTaskInfo();  OperatorSubtaskDescriptionText operatorSubtaskDescription =  new OperatorSubtaskDescriptionText(  operatorID,  operatorClassName,  taskInfo.getIndexOfThisSubtask(),  taskInfo.getNumberOfParallelSubtasks());  final String operatorIdentifierText = operatorSubtaskDescription.toString();  final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =  taskStateManager.prioritizedOperatorState(operatorID);  CheckpointableKeyedStateBackend<?> keyedStatedBackend = null;  OperatorStateBackend operatorStateBackend = null;  CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;  CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;  InternalTimeServiceManager<?> timeServiceManager;  try {  // 创建keyed类型的状态后端// -------------- Keyed State Backend --------------  keyedStatedBackend =  keyedStatedBackend(  keySerializer,  operatorIdentifierText,  prioritizedOperatorSubtaskStates,  streamTaskCloseableRegistry,  metricGroup,  managedMemoryFraction);  //创建operator类型的状态后端// -------------- Operator State Backend --------------  operatorStateBackend =  operatorStateBackend(  operatorIdentifierText,  prioritizedOperatorSubtaskStates,  streamTaskCloseableRegistry);  //创建原生类型状态后端// -------------- Raw State Streams --------------  rawKeyedStateInputs =  rawKeyedStateInputs(  prioritizedOperatorSubtaskStates  .getPrioritizedRawKeyedState()  .iterator());  streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);  rawOperatorStateInputs =  rawOperatorStateInputs(  prioritizedOperatorSubtaskStates  .getPrioritizedRawOperatorState()  .iterator());  streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);  //创建Internal Timer Service Manager// -------------- Internal Timer Service Manager --------------  if (keyedStatedBackend != null) {  // if the operator indicates that it is using custom raw keyed state,  // then whatever was written in the raw keyed state snapshot was NOT written            // by the internal timer services (because there is only ever one user of raw keyed            // state);            // in this case, timers should not attempt to restore timers from the raw keyed            // state.            final Iterable<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers =  (prioritizedOperatorSubtaskStates.isRestored()  && !isUsingCustomRawKeyedState)  ? rawKeyedStateInputs  : Collections.emptyList();  timeServiceManager =  timeServiceManagerProvider.create(  keyedStatedBackend,  environment.getUserCodeClassLoader().asClassLoader(),  keyContext,  processingTimeService,  restoredRawKeyedStateTimers);  } else {  timeServiceManager = null;  }  // -------------- Preparing return value --------------  return new StreamOperatorStateContextImpl(  prioritizedOperatorSubtaskStates.getRestoredCheckpointId(),  operatorStateBackend,  keyedStatedBackend,  timeServiceManager,  rawOperatorStateInputs,  rawKeyedStateInputs);  } catch (Exception ex) {  。。。。
}

流程梳理:

  1. 从environment中获取TaskInfo,并基于Task实例创建OperatorSubtaskDescriptionText。Operator中Task实例的描述信息包含OperatorID、OperatorClassName等,最终用于创建OperatorStateBackend的状态存储后端。
  2. 创建KeyedStateBackend,KeyedStateBackend是KeyedState的状态管理后端,提供创建和管理KeyedState的方法。
  3. 创建OperatorStateBackend,OperatorStateBackend是OperatorState的状态管理后端,提供获取和管理OperatorState的接口。
  4. 创建KeyGroupStatePartitionStreamProvider实例,提供创建和获取原生KeyedState的方法。
  5. 创建StatePartitionStreamProvider实例,提供创建和获取原生OperatorState的方法。
  6. 将所有创建出来的托管状态管理后端keyedStatedBackend和operatorStateBackend、原生状态存储后端rawKeyedStateInputs和rawOperatorStateInputs及timeServiceManager实例,全部封装在StreamOperatorStateContextImpl上下文对象中,并返回给AbstractStreamOperator使用。

 
小结
StreamTaskStateInitializer.streamOperatorStateContext()方法包含创建托管状态和原生状态管理后端的全过程。StreamOperator的实现类能够从StreamOperatorStateContext中获取这些状态管理组件并使用它们创建指定类型的状态,最终状态数据会存储在状态管理后端指定的物理介质上,例如堆内存或RocksDB。

StateInitializationContext会被用于算子和UserDefinedFunction中,实现算子或函数中的状态数据操作。

 

3. StateInitializationContext的接口设计。

StateInitializationContext接口同时继承了ManagedInitializationContext接口和FunctionInitializationContext接口。StateInitializationContext接口的默认实现类为StateInitializationContextImpl。
在这里插入图片描述

  1. ManagedInitializationContext接口提供了托管状态使用的KeyedStateStore和OperatorStateStore获取方法,即KeyedStateBackend和OperatorStateBackend的封装类。算子进行初始化时,会通过KeyedStateStore和OperatorStateStore提供的方法创建和管理指定类型的托管状态。

  2. FunctionInitializationContext提供了用户自定义函数状态数据初始化需要的方法。它和ManagedInitializationContext保持一致,这主要是为了和算子使用的上下文进行区分,但两者的操作基本一致。

  3. StateInitializationContext提供了对托管状态数据的管理,并在内部继承和拓展了获取及管理原生状态数据的方法,如getRawOperatorStateInputs()、getRawKeyedStateInputs()等

  4. StateInitializationContextImpl具备操作管理状态和原生状态的能力。基于它可以获取不同类型的状态管理后端,并基于状态管理操作状态数据。

在这里插入图片描述

 

4. 状态初始化举例:UDF状态初始化

在AbstractStreamOperator中调用initializeState(StateInitializationContext context)抽象方法初始化Operator中的状态。这里以AbstractUdfStreamOperator为例说明具体算子、UDF是如何进行状态初始化的。

AbstractUdfStreamOperator.initializeState()方法实际上调用了StreamingFunctionUtils.restoreFunctionState()方法对User-Defined Function中的状态数据进行初始化和恢复,实际上就是将上文创建的StateInitializationContext上下文信息提供给Function接口使用。

public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

恢复函数内部的状态数据涉及Checkpoint的实现,我们会在之后介绍如何在StreamingFunctionUtils.restoreFunctionState()方法中恢复函数中的状态数据。

 
《Flink设计与实现:核心原理与源码解析》张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/460609.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SSH口令问题

SSH&#xff08;Secure Shell&#xff09;是目前较可靠、专为远程登录会话和其他网络服务提供 安全性的协议&#xff0c;主要用于给远程登录会话数据进行加密&#xff0c;保证数据传输的安全。 SSH口令长度太短或者复杂度不够&#xff0c;如仅包含数字或仅包含字母等时&#xf…

【Python4Delphi】学习笔记(一):介绍篇

一、前言&#xff1a; 1. python语言简介&#xff1a; 众所周知&#xff0c;python是目前非常流行的编程语言之一&#xff0c;自20世纪90年代初Python语言诞生至今&#xff0c;它已被逐渐广泛应用于系统管理任务的处理和Web编程。 由于Python语言的简洁性、易读性以及可扩展性…

【深度学习:SegGPT】在上下文中分割所有内容 [解释]

【深度学习&#xff1a;SegGPT】在上下文中分割所有内容 [解释] SegGPT与以前的模型相比如何&#xff1f;SegGPT在实践中是如何工作的&#xff1f;SegGPT培训计划上下文着色上下文集成上下文调整SegGPT 训练参数 如何尝试 SegGPT&#xff1f;使用哪些数据集来训练 SegGPT&#…

mac终端怎么恢复初始设置?图文教程不想看看吗?

某网友说“不小心把终端弄成了这样&#xff1f;请问该怎么办呢&#xff1f;mac终端怎么恢复初始设置&#xff1f;” 其实&#xff0c;这个问题不难&#xff0c;在终端中选择【还原初始值】即可。 Mac终端初始化具体怎么操作&#xff1f;话不多说&#xff0c;图文教程分享给大…

【Java】MybatisPlus入门

学习目标 能够基于MyBatisPlus完成标准Dao开发 能够掌握MyBatisPlus的条件查询 能够掌握MyBatisPlus的字段映射与表名映射 能够掌握id生成策略控制 能够理解代码生成器的相关配置 一、MyBatisPlus简介 1. 入门案例 问题导入 MyBatisPlus环境搭建的步骤&#xff1f; 1.1 Sp…

Spring + Tomcat项目中nacos配置中文乱码问题解决

实际工作的时候碰到了nacos中文乱码的问题&#xff0c;一顿排查最终还是调源码解决了。下面为具体的源码流程&#xff0c;有碰到的可以参考下。 对于nacos配置来说&#xff0c;初始主要源码就在NacosConfigService类中。里面有初始化获取配置content以及设置对应监听器的操作。…

猫头虎分析:如何利用ChatGPT及生成式AIGC提高工作效率 ‍

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

npm 上传一个自己的应用(1) 搭建一个项目环境

上文 在npm官网中注册一个账号并登录 带着大家创建了一个npm账号 我们先登录官网 然后 我们在自己电脑中创建一个文件夹 这个文件夹叫什么没有太大所谓 我这里直接叫 grnpmtext 然后 我们在这个文件夹中初始化一个项目 终端输入 npm initpackage name 要我们输入项目的名称 …

计算机网络-无线通信技术与原理

一般我们网络工程师接触比较多的是交换机、路由器&#xff0c;很少涉及到WiFi和无线设置&#xff0c;但是呢在实际工作中一般企业也是有这些需求的&#xff0c;这就需要我们对于无线的一些基本配置也要有独立部署能力&#xff0c;今天来简单了解一下。 一、无线网络基础 1.1 无…

数智电网革新|数智化坚强电网的“四大特征”

现代电网已经成为现代产业体系中不可或缺的基础设施&#xff0c;是支撑经济活动的重要能源基础设施和产业基础设施之一。电力安全保供是国家发展的重中之重。当前&#xff0c;随着能源转型的加速推进&#xff0c;新能源不断高比例地接入电网&#xff0c;同时受到气候变化和极端…

Optimism Collective 为 Covalent Network(CQT)提供价值 20 万美元的生态系统资助

Covalent Network&#xff08;CQT&#xff09; 是 Web3 生态系统中关键的“数据可用性”层&#xff0c;在与 Optimism Collective 多年的合作中取得了骄人的成果。Covalent Network&#xff08;CQT&#xff09;对于 Optimism 跨链数据的增长产生了直接的影响&#xff0c;而这一…

【已解决】onnx转换为rknn置信度大于1,图像出现乱框问题解决

前言 环境介绍&#xff1a; 1.编译环境 Ubuntu 18.04.5 LTS 2.RKNN版本 py3.8-rknn2-1.4.0 3.单板 迅为itop-3568开发板 一、现象 采用yolov5训练并将pt转换为onnx&#xff0c;再将onnx采用py3.8-rknn2-1.4.0推理转换为rknn出现置信度大于1&#xff0c;并且图像乱框问题…