06 flink 的各个角色的交互

前言

这里主要是 涉及到 flink 中各个角色的交互 

TaskManager 和 ResourceManager 的交互

JobMaster 和 ResourceManager 的交互

等等流程 

 

 

TaskManager 和 ResourceManager 的交互

主要是 包含了几个部分, 如下, 几个菜单 

TaskManager向 ResourceManager 注册 

ResourceManager 向 TaskManager 心跳的发送 

ResourceManager 这边收到 TaskManager 的 slotReport 之后的处理

 

TaskManager向 ResourceManager 注册 

TaskManager 中 TaskExecutor 启动之后, 会向 ResourceManager 注册 

6bf5331485b54b8785ac8dbd9757a5cf.png

 

注册如下, 向 ResourceManager 这边发送请求, 携带上基本信息, resourceId 是 TaskManagerRunner 中 ResourceID.generate() 随机生成的一个字符串 

ResourceManager 的地址是根据 JobManager 的信息拼接上固定的 “/user/resourcemanager” 得到的 

ebd8fd18d2934814acf831fca098e806.png

 

JobManager 这边的 ResourceManager 注册该 TaskManager 的相关信息, 并相应 ResourceManager 这边创建的 WorkerRegistration 信息返回 

然后这里注册了一个发送到 TaskManager 的定时心跳, 注册到了 ResourceManager.taskManagerHeartbeatManager 中 

9bfd33fe609e471fa33f37ba15ee367a.png

 

 

ResourceManager 向 TaskManager 心跳的发送 

这里 ResourceManager 向 TaskManager 这边心跳的发送是这里 HeartbeatManagerSenderImpl.run 中处理的, 定时的效果是 延时+递归 来实现的 

8da832892a1b49afbe605060dbb73fa5.png

 

然后接着 TaskManager 这边会响应 TaskManager 的各个 slot 的相关信息给 ResourceManager

4f9792bb93f74db897787cb80bbdd188.png 

 

ResourceManager 这边收到 TaskManager 的 slotReport 之后的处理

然后接着就是 ResourceManager 这边的处理, 更新目标 taskManager 的 slot 的相关信息 

261fe60ad42b4ec28d3d09c07fd7918a.png

 

然后 web 页面上, 这里 TaskManager 的相关信息 就是来自于 ResourceManager

ac7bcc81af7c45308ace5c7ea8227265.png 

 

JobMaster 和 ResourceManager 的交互

主要是 包含了几个部分, 如下, 几个菜单 

JobMaster 向 ResourceManager 注册 

 

JobMaster 向 ResourceManager 注册 

同样是 JobMaster 启动的饿时候, 会自动向 ResourceManager 注册 

b9238beb76d54329b98e918a32297392.png

 

注册的信息如下, jobId, jobResourceId, 以及 jobManager 的交互信息 

然后这里的 jobResourceId 同样是 JobMaster 初始化的时候 ResourceID.generate() 生成的一个随机字符串

62dad2f5b2cb4bfcb6d8b0d9b7e2c5e5.png

 

ResourceManager 向 JobMaster 这边心跳的发送

ResourceManager 收到 JobMaster 的注册请求之后, 会向 jobManagerHeartbeatManager 注册向 JobMaster 的心跳任务 

abbc109307d9453d9f4e3d57eae63d15.png 

然后就是 JobManager 这边收到心跳之后, 向 ResourceManager 发送了一个心跳信息, 未携带 任何数据

c637a55c68e344f1b447f1244af784eb.png

 

ResourceManager 这边收到 TaskManager 的 null 之后的处理

无任何处理, 也不用任何处理 

7eecef1860874dec9f77e3f71efeb320.png

 

 

JobMaster 这边资源请求的流程

JobMaster 启动之后, 自动连接 ResourceManager

连接上 ResourceManager 之后, 会向 ResourceManager 发送执行资源的请求 

60424c9481504bf8b0fbeba4a762a218.png

 

然后是 ResourceManager 这边找到合适的 TaskManagerSlot, 然后 allocateSlot, 向 TaskManager 指定具体的 job 

25373c9edfc14bc989433bdb5c3ef93c.png

 

ResourceManager 向 TaskManager 发送请求, 指派其需要执行 目标 job

cf07415c888e4376bbf1b82290782c2b.png

 

然后是 TaskExecutor 注册 job 信息, 以及对方 JobMaster 的交互信息 

2edb9e8e079343d5bc3e2a734f8dde05.png

 

然后是 TaskExecutor 这边主动和目标 JobMaster 获取联系, 表示为 JobMaster 提供一个 TaskManagerSlot 用于执行目标任务 

8c65c650f49f44858a5c77412bbce1fa.png

 

然后是 JobMaster 这边拿到了 TaskManagerSlot 之后执行任务 

585f2be7271d4a968880a1266d76669d.png 

接着是更新 Execution 的 slot 的信息, 然后这个是外层 CompletableFuture 是 Execution.scheduleForExecution 中的 allocationFuture

27280e039fb346b1ac7d58465b827f83.png

 

然后就是 JobMaster 这边的 deploy, 这里会向具体的 TaskExecutor 发送任务 

177c71f8c4604d3f87c83fd4c5af8881.png 

然后 deploy 里面就是根据 ExecutionVertex 封装 TaskDeploymentDescriptor, 然后将相关信息发送到 TaskExecutor 去执行 

6cc8427929a84df698956e3a5799ad73.png 

 

处理函数的传递流程 

这一系列流程如下

  1. driver 这边采集各个函数对象, 封装 UserCodeObjectWrapper, 然后序列化 封装到 TaskConfig, 以 udf 结尾的 配置信息作为 key, 这部分 TaskConfig 是包含在 JobGraph 中的每一个 JobVertex 中的, 然后伴随着 JobGraph 的序列化传递到 JobManager 这边进行处理
  2. JobManager 这边反序列化 JobGraph, 然后创建 JobMaster, 该 JobVertex 经过 ExecutionVertex, TaskDeploymentDescriptor 然后传递到 TaskExecutor
  3. TaskExecutor 这边反序列化 DataSourceTask, ChainedDriver, DataSinkTask 等等, 然后 执行任务

所以这个流程中 JobManager 这边是仅仅是获取, 持有, 传递 udf 部分, 不涉及 反序列化

 

 

driver 这边

从上下文获取 function 对象, 也就是我们驱动代码里面 “new Test01WordCount$MyFlatMapMapper()”, 然后封装了一个 UserCodeObjectWrapper 被 FlatMapOperatorBase 持有 

然后会经历 Plan, OptimizedPlan 然后到 JobVertex 阶段 

ecf48fad7bfd40ce9a620130da7423ca.png

 

然后是创建 JobGraph, 创建每一个 JobVertex 的时候, 序列化该 JobVertex 的 处理函数 

ed0277d759a1493798c9e2274c7b9cf9.png 

然后是将 chainedTask, 的相关配置信息放在 主JobVertex

然后隔离是通过 ”chaining.taskconfig.” + $idx 来进行隔离的, 相当于是增加了一系列的名称空间 

5e36bc327359446e9ff491c27d2f964d.png

 

然后就是 JobGraph 的序列化, 准备发送 http 请求 传输 Job 到 JobManager

e4afe97b2f0f4f579f88a3d772fd5235.png 

 

JobManager 这边 

JobManager 这边反序列化 JobGraph 如下, 这里面和客户端那边一样 

然后 这边的 JobGraph 和 客户端那边的一致, 包含了 JobVertex 中包含了 TaskConfiguration 相关信息 

eb24d5a273fd495fad368ed910e660e8.png

 

然后是到后面封装 TaskDeploymentDescriptor 这里可以看到, 也是间接的从 JobVertex 中获取的 TaskConfiguration

然后 最终的传递是通过 TaskInformation 从 JobMaster 这边传递到 TaskExecutor

eb1086aaef2e4a5aaa12886b02b7b9e6.png

 

TaskExecutor 这边 

反序列化各个 DataSourceTask, ChainedDriver, DataSinkTask 等等的时候

根据索引, 添加前缀, 来获取给定的 ChainedDriver, 然后添加到 chinedTaskTarget 中, 基于 previous 形成了一个单项任务执行的链表, 用于后面的执行 

这里各个任务的前缀为 “chaining.taskconfig” + $idx 和前面放入的时候, 是对称的 

4a7610710a3c44b6857dcce282315ea6.png

 

这里是具体的获取配置的地方, 前缀 + “udf”, 然后从 配置信息中获取配置

2a2af1b797c04762bb07974199c5b7f9.png 

然后是 反序列化 UserCodeObjectWrapper, 里面封装了目标函数, Test01WordCount$MyFlatMapMapper

b8eee5151bcb4b5d8a931ab44f9e7a45.png 

 

完 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/483408.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c#创建安装windows服务

背景:最近在做设备数据对接采集时,遇到一些设备不是标准的Service-Client接口,导致采集的数据不够准确;比如设备如果中途开关机后,加工的数量就会从0开始重新计数,因此需要实时监控设备的数据,进行叠加处理;考略到工厂设备比较多,实时监听接口的数据为每秒3次,因此将…

Spring Boot中实现列表数据导出为Excel文件

点击下载《Spring Boot中实现列表数据导出为Excel文件》 1. 前言 本文将详细介绍在Spring Boot框架中如何将列表数据导出为Excel文件。我们将通过Apache POI库来实现这一功能,并解释其背后的原理、提供完整的流程和步骤,以及带有详细注释的代码示例。最…

centos 9 编译安装 LAMP wordpress

[rootlocalhost ~]# ll 总用量 655760 -rw-------. 1 root root 1040 2月 17 16:57 anaconda-ks.cfg drwxr-xr-x. 29 501 games 4096 2月 21 11:00 apr-1.7.4 -rw-r--r--. 1 root root 1122147 2月 21 10:57 apr-1.7.4.tar.gz drwxr-xr-x. 21 501 games …

动态规划记忆化搜索之滑雪

给定一个 R 行 C 列的矩阵,表示一个矩形网格滑雪场。 矩阵中第 i 行第 j 列的点表示滑雪场的第 i 行第 j 列区域的高度。 一个人从滑雪场中的某个区域内出发,每次可以向上下左右任意一个方向滑动一个单位距离。 当然,一个人能够滑动到某相…

【day02】每天三道 java后端面试题:Java、C++和Go的区别 | Redis的特点和应用场景 | 计算机网络七层模型

文章目录 1. Java、C和 Go 语言的区别,各自的优缺点?2. 什么是Redis?Redis 有哪些特点? Redis有哪些常见的应用场景?3. 简述计算机网络七层模型和各自的作用? 1. Java、C和 Go 语言的区别,各自的…

【免费雾锁王国】2024年新手搭建雾锁王国服务器教程

免费自建雾锁王国Enshrouded服务器,先领取阿里云300元无门槛代金券,然后在雾锁王国Enshrouded专题页一键部署,不需要基础,鼠标点选即可10秒钟创建一台雾锁王国游戏服务器,超简单,阿里云服务器网aliyunfuwuq…

vulfocus靶场搭建

vulfocus靶场搭建 什么是vulfocus搭建教程靶场配置场景靶场编排靶场优化 什么是vulfocus Vulfocus 是一个漏洞集成平台,将漏洞环境 docker 镜像,放入即可使用,开箱即用,我们可以通过搭建该靶场,简单方便地复现一些框架…

简单理解VQGAN

简单理解VQGAN TL; DR:与 VQVAE 类似,隐层压缩表征自回归生成的两阶段图像生成方法。增加感知损失和对抗损失,提高压缩表征模型解码出图片的清晰度。还可以通过编码并预置条件表征,实现条件生成。 隐层压缩表征自回归生成&#…

将本地项目上传到svn服务端和git

一、SVN 1.创建svn库,下面生成了三个文件夹,branches指分支,trunk下可以放项目 2.在本地checkout,填入svn库的地址,因为是新建的,所以checkout的是空文件夹 把自己的项目复制到trunk下,在项目上 右键-TortoiseSVN-add add完之后 右键-svn commit 3.idea打开这个项目,将项目跟…

QPaint绘制自定义仪表盘组件01

网上抄别人的,只是放这里自己看一下,看完就删掉 ui Dashboard.pro QT core guigreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# You can make your code fail to compile if it uses deprecated APIs. # In order to do so, uncomm…

第3.4章:StarRocks数据导入——Routine Load

注:本篇文章阐述的是StarRocks-3.2版本的Routine Load导入机制 一、概述 Routine Load(例行导入)支持用户提交一个常驻的导入任务,可以将消息流存储在 Kafka 的Topic中,通过订阅Topic 中的全部或部分分区的消息&#…

使用JDBC操作数据库(IDEA编译器)

目录 JDBC的本质 ​ JDBC好处 JDBC操作MySQL数据库 1.创建工程导入驱动jar包 2.编写测试代码 ​相关问题 JDBC的本质 官方(sun公司) 定义的一套操作所有关系型数据库的规则,即接口各个数据库厂商去实现这套接口,提供数据库驱动jar包我们可以使用这…