深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解

Flink Program 编程套路回顾

1、获取执行环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2、通过执行环境对象,注册数据源 Source,得到数据抽象
DataStream ds = env.socketTextStream(...)
3、调用数据抽象的各种Transformation执行逻辑计算
DataStream resultDS = ds.flatMap(...).keyBy(...).sum(...);
4、将各种Transformation执行完毕之后得到的计算结果数据抽象注册 Sink
resultDS.addSink(...)
5、提交Job执行
env.execute(...)

Flink Job 提交脚本解析

# Submission to an already running Flink on YARN cluster
./bin/flink run --target yarn-session
# Submission spinning up a Flink on YARN cluster in Per-Job Mode
./bin/flink run --target yarn-per-job
# Submission spinning up Flink on YARN cluster in Application Mode
./bin/flink run-application --target yarn-application

具体可以参考官网:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#advanced-cli

CliFrontend 提交分析

当用户把 Flink 应用程序打成 jar 使用 flink run … 的 shell 命令提交的时候,底层是通过 CliFrontend 来处理。底层的逻辑,就是通过反射来调用用户程序的 main() 方法执行。
需要注意的是,Application 模式下,会通过 YarnClusterDescriptor.deployInternal 方法在 yarn 中部署一个 application 集群,返回 YarnRestClusterClient 对象。yarn 中会启动一个 EmbeddedJobClient,执行 submitJob 方法提交 jobGraph。

ExecutionEnvironment 源码解析

StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供了一些重要的操作机制:

1、提供了 readTextFile(), socketTextStream(), createInput(), addSource() 等方法去对接数据源。
2、提供了 setParallelism() 设置应用程序的并行度。
3、StreamExecutionEnvironment 管理了 ExecutionConfig 对象,该对象负责 Job 执行的一些行为配置管理。还管理了 Configuration 管理一些其他的配置。这个所谓的其他配置,还包含了 Checkpoint 的配置,这个 chekcpoint 的配置参数,会单独解析出来,存储在 CheckpontConfig 中
4、StreamExecutionEnvironment 管理了一个 List<Transformation<?>> transformations 成员变量,该成员变量,主要用于保存 Job 的各种算子转化得到的 Transformation,把这些 Transformation 按照逻辑拼接起来,就能得到 StreamGragh, 注意转换顺序:
UserFunction ==> StreamOperator ==> Transformation ==> StreamNode
5、StreamExecutionEnvironment 提供了 execute() 方法主要用于提交 Job 执行。该方法接收的参数就是:StreamGraph

Flink on YARN Per-job 模式提交流程分析

入口类:ApplicatoinMaster: YarnJobClusterEntryPoint
在这里插入图片描述
在这里插入图片描述

Job提交流程源码分析

getStreamGraph(jobName) 生成 StreamGraph 解析

// 入口
StreamGraph streamGraph = getStreamGraph(jobName, true){// 通过 StreamGraphGenerator 来生成 StreamGraphStreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate(){streamGraph = new StreamGraph(....)for(Transformation<?> transformation : transformations) {transform(transformation);}}
}transform(transformation){// 先递归处理该 Transformation 的输入Collection<Integer> inputIds = transform(transform.getInput());// 将 Transformation 变成 Operator 设置到 StreamGraph 中,其实就是添加 StreamNodestreamGraph.addOperator(....);// 设置该 StreamNode 的并行度streamGraph.setParallelism(transform.getId(), parallelism);// 设置该 StreamNode 的入边 SreamEdgefor(Integer inputId : inputIds) {streamGraph.addEdge(inputId, transform.getId(), 0);// 内部实现// 构建 StreamNode 之间的 边(StreamEdge) 对象StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, ...){// TODO_MA 注释: 给 上游 StreamNode 设置 出边getStreamNode(edge.getSourceId()).addOutEdge(edge);}// TODO_MA 注释: 给 下游 StreamNode 设置 入边getStreamNode(edge.getTargetId()).addInEdge(edge);}
}

execute(StreamGraph) 解析

// 入口
JobClient jobClient = executeAsync(streamGraph){// 执行一个 SreamGraphexecutorFactory.getExecutor(configuration).execute(streamGraph, configuration){// 第一件事:由 StreamGraph 生成 JobGraghJobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);// 第二件事:通过 RestClusterClient 提交 JobGraph 到Flink集群clusterClient.submitJob(jobGraph)}
}// 通过 RestClusterClient 来提交 JobGraph
RestClusterClient.submitJob(JobGraph jobGraph){// 继续提交RestClusterClient.sendRetriableRequest(){// 通过 RestClient 提交RestClient.sendRequest(webMonitorHost, webMonitorPort, ...){// 继续提交RestClient.submitRequest(targetAddress,targetPort,httpRequest,responseType)}}
}

最终通过 channel 把请求数据,发给 WebMonitorEndpoint 中的 JobSubmitHandler 来执行处理。

小结

01、用户根据 Flink 应用程序的编写套路,写好应用程序,打成 jar 包,通过 flink run 的命令来执行提交
02、这个命令的底层,其实是执行: CliFrontend 组件来执行提交
03、这个 CliFrontend 的内部,会通过反射的技术,来转交执行到用户自定义应用程序的 main()
04、先获取 StreamExecutionEnvironment 执行环境对象实例
05、执行算子:其实就是从 算子 ---> function ---> StreamOperator ---> Transformation
06、执行 StreamExecutionEnvironment 的 executor 方法来执行提交
07、首先遍历 StreamExecutionEnvironment 的 transformations 这个 list 来生成 StreamGraph,之后会继续被构建成 JobGraph
08、具体的内部的提交是通过 RestClusterClient 来执行提交
09、在通过 RestClusterClient 提交之前,其实还会做一件事:把 SreamGraph 变成 JobGraph,也还会先把 JobGraph 持久化成为一个磁盘文件
10、在这个 RestClusterClient 的内部,其实是通过 RestClient 来提交
11、RestClient 其实在初始化的时候,就初始化了一个 Netty 客户端
12、通过封装一个 HttpRequest 对象,包含了需要提交的 JobGraph 文件和 Jar 包等,通过 Netty 客户端链接服务端,发送请求对象到服务端
13、Flink 主节点 JobManager 负责处理这个请求的是 WebMonitorEndpoint 中的 Netty 服务端,接收到 rest 请求会调用 Router 执行 route 处理,找到对应的 Handler 执行处理。提交 Job 对应的 Handler 是 JobSubmitHandler

在这里插入图片描述

WebMonitorEndpoint 处理 RestClient 的 JobSubmit 请求

最终处理这个请求: Flink 主节点 JobManager 负责处理这个请求的是 WebMonitorEndpoint 中的 Netty 服务端,接收到 rest 请求会调用 Router 执行 route 处理,找到对应的 Handler 执行处理。提交 Job 对应的 Handler 是 JobSubmitHandler。

// JobManager 服务端处理入口
JobSubmitHandler.handleRequest(){// 恢复得到 JobGraphJobGraph jobGraph = loadJobGraph(requestBody, nameToFile);// 通过 Dispatcher 提交 JobGraphDispatcher.submitJob(jobGraph, timeout);
}

JobMaster 启动源码剖析

关键方法: jobMasterServiceFactory.createJobMasterService
核心的工作是:

  • 创建 JobMaster 这个 RpcEndpoint 组件,负责通信。内部会创建一个 DefaultScheduler 调度组件,在初始化该调度组件的时候,会调用 ExecutionGraphFactory 的相关方法,来把 JobGraph 转换成 ExectionGraph
  • JobMaster 启动,跳转到 onStart() 方法。内部的主要工作,就是以下这三:
    • 启动心跳机制,维持和 ResourceManager,和 TaskExecutor 之间的心跳
    • 启动 SlotPoolImpl 这个 slot 管理组件。
    • 从 ZK 获取 ResourceManager 的地址,从而进行 JobMaster 向 ResourceManager 的注册
  • 启动的这个 JobMaster 负责这个 Job 中的所有的 Task 的 slot 的申请和 任务的派发,状态的跟踪,容错,还有 checkpoint等各种操作

JobMaster 和 ResourceManager/TaskExecutor 的心跳

在这里插入图片描述

JobMaster 向 ResourceManager 注册

// 启动 JobMaster
jobMaster.start(){JobMaster.onStart(){startJobExecution(){// 第一件大事:启动 JobMaster 必要的一些工作startJobMasterServices(){// 第一件事: 启动心跳机制this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices);// 第二件事: 启动 SlotPoolImplslotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());// 第三件事: 从 ZK 获取 ResourceManager 的地址// 这儿就是 JobMaster 向 ResourceManager 执行注册的入口resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());}// 第二件大事:开始调度执行startScheduling();}}
}
ResourceManager.registerJobManager(){// ResourceManager 关于 JobMaster 的注册内部实现,重要的事情做了四件registerJobMasterInternal(jobMasterGateway, jobId, ....){// TODO_MA 马中华 注释: 生成 JobMaster 注册对象JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobManagerResourceId, ....);// TODO_MA 马中华 注释: 完成注册jobManagerRegistrations.put(jobId, jobManagerRegistration);jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);// TODO_MA 马中华 注释: 加入心跳管理jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>() {});// TODO_MA 马中华 注释: 返回 JobMaster 注册成功return new JobMasterRegistrationSuccess(getFencingToken(), resourceId);}
}

Flink Graph 演变

在这里插入图片描述

StreamGraph 构建和提交源码解析

在这里插入图片描述
关于 StreamNode 的定义:

public class StreamNode {private final int id;private int parallelism;private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();private final Class<? extends AbstractInvokable> jobVertexClass;
}

关于 StreamEdge 的定义:

public class StreamEdge implements Serializable {private final String edgeId;private final int sourceId;private final int targetId;
}

JobGraph 构建和提交源码解析

JobGraph: StreamGraph 经过优化后生成了 JobGraph,提交给 Flink 集群的数据结构。它包含的主要抽象概念有:

1、JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个 JobVertex,即一个JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是 IntermediateDataSet。
2、IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是 JobVertex,consumer 是 JobEdge。
3、JobEdge:代表了 job graph 中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex。

在 StreamGraph 构建 JobGragh 的过程中,最重要的事情就是 operator 的 chain 优化,那么到底什么样的情况的下 Operator 能chain 在一起呢 ?答案是要满足以下 9 个条件:

// 1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游节点都在同一个 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3、前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);
// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认是 HEAD)
!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是 ALWAYS)
!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、两个节点间物理分区逻辑是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7、两个算子间的 shuffle 方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用户没有禁用 chain
streamGraph.isChainingEnabled();

在这里插入图片描述
构建逻辑的重点代码:

1、在 connect 之间,调用的 createChain() 就是先执行优化,然后再生成 JobVertex
2、然后 调用 connect 之后,是为了组织关系1、先生成 IntermediateDataSet 和 JobEdge2、把 IntermediateDataSet 和 当前 JobVertex 设置为 JobEdge 的 source 和 target3、把 JobEdge 设置为这个 IntermediateDataSet 的消费者

关于 JobVertex 的定义:

public class JobVertex implements java.io.Serializable {private final JobVertexID id;private final ArrayList<IntermediateDataSet> results = new ArrayList<>();private final ArrayList<JobEdge> inputs = new ArrayList<>();private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;private String invokableClassName;
}

关于 IntermediateDataSet 的定义:

public class IntermediateDataSet implements java.io.Serializable {private final IntermediateDataSetID id;private final JobVertex producer;private final List<JobEdge> consumers = new ArrayList<JobEdge>();
}

关于 JobEdge 的定义:

public class JobEdge implements java.io.Serializable {private final JobVertex target;private IntermediateDataSet source;private IntermediateDataSetID sourceId;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/341864.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PostgreSQL内存浅析

体系结构 &#xff08;https://www.postgresql.fastware.com/blog/lets-get-back-to-basics-postgresql-memory-components&#xff09; &#xff08;http://geekdaxue.co/read/fcantsql/qts5is) 共享内存 linux的共享内存实现 (https://momjian.us/main/writings/pgsql/insi…

Blazor快速开发框架Known-V2.0.0

Known2.0 Known是基于Blazor的企业级快速开发框架&#xff0c;低代码&#xff0c;跨平台&#xff0c;开箱即用&#xff0c;一处代码&#xff0c;多处运行。 官网&#xff1a;http://known.pumantech.comGitee&#xff1a; https://gitee.com/known/KnownGithub&#xff1a;ht…

【Linux】Linux 系统编程——tree 命令

文章目录 1. 命令概述2. 命令格式3. 常用选项4. 相关描述4.1 tree 命令安装 5. 参考示例5.1 创建树形目录5.2 使用 tree 命令查看树形目录 1. 命令概述 tree 命令用于在命令行界面以树状图形式显示目录及其子目录的内容。这个命令递归地列出所有子目录&#xff0c;并可选择显示…

品牌渠道治理思路浅谈

渠道是否管控好&#xff0c;体现在渠道中有无低价链接&#xff0c;或者是低价数据的占比是否较低&#xff0c;如果打开电商平台&#xff0c;搜索一款产品的价格&#xff0c;有很多链接的价格低于旗舰店价格&#xff0c;这显然不是一个健康的渠道表现&#xff0c;所以治理渠道就…

odoo linux环境打印乱码或无内容

在odoo打印中会遇到乱码或者无内容显示&#xff0c;需要安装一些包 sudo apt-get install ttf-wqy-zenhei sudo apt-get install ttf-wqy-microhei安装前 安装后

mysql清空并重置自动递增初始值

需求&#xff1a;当上新项目时&#xff0c;测试环境数据库导出来的表id字段一般都有很大的初始递增值了&#xff0c;需要重置一下 先上代码&#xff1a; -- 查看当前自动递增值 SHOW CREATE TABLE table_name; -- 重建自动递增索引&#xff08;可选&#xff09; ALTER TABLE t…

UNIX网络编程-纪要

网络编程 网络协议模型网络中的一条连接 套接字编程字节序套接字地址结构socket创建套接字UNIX域套接字tcp套接字函数注意点TCP绑定端口问题 TIME_WAIT状态使用TCP编程注意点使用UDP编程注意点网络中数据大小的限制客服端-服务器交互问题网络数据读写问题常见套接字选项设置套接…

GitHub访问慢:分享两个镜像加速网站

GitHub网站&#xff0c;不知道是不是因为我的网络问题&#xff0c;最近一直断断续续&#xff0c;不稳定。非常难受。找到两个镜像网站&#xff0c;使用体验很不错&#xff0c;作以分享&#xff01; GitHub官方网站 官网&#xff1a;https://github.com/ 两个镜像网站分享 1.…

MySQL——SQL语句进阶

select * from 表 where 条件 group by 条件 order by 排序 limit 分组 Group by select * from 表 group by 条件 结果为每个分组的第一条记录&#xff0c;该条记录作为该组的标志 select * from subject GROUP BY gradeidselect count(1),gradeid from subject GROUP B…

图解JVM (及一些垃圾回收\GC相关面试题 持续更新)

垃圾回收&#xff0c;顾名思义就是释放垃圾占用的空间&#xff0c;从而提升程序性能&#xff0c;防止内存泄露。当一个对象不再被需要时&#xff0c;该对象就需要被回收并释放空间。 Java 内存运行时数据区域包括程序计数器、虚拟机栈、本地方法栈、堆等区域。其中&#xff0c;…

JVM:从零到入门

JVM&#xff0c;就是Java虚拟机。 JVM是一个巨大的话题&#xff0c;我们本文主要简单介绍一些围绕JVM相关的基础知识。 目录 JVM内存区域划分 本地方法栈 虚拟机栈 堆 程序计数器 方法区/ 元数据区 类加载 1.加载 2.验证 3.准备 4.解析 5.初始化 双亲委派模型 …

YOLOv8独家原创改进:多层次特征融合(SDI)结合PConv、DualConv、GSConv,实现二次创新 | UNet v2最新论文

💡💡💡本文独家改进:多层次特征融合(SDI)高效结合DualConv、PConv、GSConv等实现二次创新 1)替代原始的Concat; 收录 YOLOv8原创自研 https://blog.csdn.net/m0_63774211/category_12511737.html?spm=1001.2014.3001.5482 💡💡💡全网独家首发创新(原创)…