Flink源码之JobManager启动流程

从启动命令flink-daemon.sh中可以看出StandaloneSession入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint, 从该类的main方法会进入ClusterEntrypoint::runCluster中, 该方法中会创建出主要服务和组件。

StandaloneSessionClusterEntrypoint::main
ClusterEntrypoint::runClusterEntrypoint
ClusterEntrypoint::startCluster
ClusterEntrypoint::runClusterprivate void runCluster(Configuration configuration, PluginManager pluginManager)throws Exception {synchronized (lock) {initializeServices(configuration, pluginManager); //初始化服务// write host information into configurationconfiguration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());final DispatcherResourceManagerComponentFactorydispatcherResourceManagerComponentFactory =createDispatcherResourceManagerComponentFactory(configuration);//创建核心组件clusterComponent =dispatcherResourceManagerComponentFactory.create(configuration,ioExecutor,commonRpcService,haServices,blobServer,heartbeatServices,metricRegistry,executionGraphInfoStore,new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),this);...ignore code}
}

可以看出关键代码是调用initializeServices以及创建Cluster Component。

protected void initializeServices(Configuration configuration, PluginManager pluginManager)throws Exception {LOG.info("Initializing cluster services.");synchronized (lock) {rpcSystem = RpcSystem.load(configuration);commonRpcService =RpcUtils.createRemoteRpcService(rpcSystem,configuration,configuration.getString(JobManagerOptions.ADDRESS),getRPCPortRange(configuration),configuration.getString(JobManagerOptions.BIND_HOST),configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));// update the configuration used to create the high availability servicesconfiguration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());ioExecutor =Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration),new ExecutorThreadFactory("cluster-io"));haServices = createHaServices(configuration, ioExecutor, rpcSystem);blobServer = new BlobServer(configuration, haServices.createBlobStore());blobServer.start();heartbeatServices = createHeartbeatServices(configuration);metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);final RpcService metricQueryServiceRpcService =MetricUtils.startRemoteMetricsRpcService(configuration, commonRpcService.getAddress(), rpcSystem);metricRegistry.startQueryService(metricQueryServiceRpcService, null);final String hostname = RpcUtils.getHostname(commonRpcService);processMetricGroup =MetricUtils.instantiateProcessMetricGroup(metricRegistry,hostname,ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));executionGraphInfoStore =createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());}
}

在initializeServices中首先创建commonRpcService,这个RPCService实例是JobManager提供RPC服务的核心,可以看出它会有个地址和监听端口号,commonRpcService可将继承自Gateway的服务实例包装成AkkaActor对外提供RPC服务,比如ResourceManager、Dispatcher。此外还创建了其他服务:

haService: 可通过HAService获取ResourceManager/Dispatcher/RestEndpoint的地址,同时也提供选主服务,组件启动时需向HAService注册,如果被选主成功,则会调用监听器的grandLeadership回调函数
BlobServer: 可用来提供存储大对象存储服务
heartbeatServices:为组件间传递心跳信息
metricRegistry:提供metric上报和查询服务,监听端口不同,新建了一个RpcService专为Metric服务
processMetricGroup:注册系统运行状态信息的Metric,比如GC/Memory/Network运行时状况,添加Metric都是通过一个MetricGroup添加
executionGraphInfoStore:缓存Job执行时信息,比如ExecutionGrap

初始化服务创建完成后,通过DefaultDispatcherResourceManagerComponentFactory:create创建JobManager的三大核心组件:Dispacher/ResourceManager/RestEndpointServer, 都是通过工厂方法创建:

DefaultDispatcherRunnerFactory
StandaloneResourceManagerFactory
SessionRestEndpointFactory

这些组件是JobManager向HAService注册获取leadership后,被ElectionService回调grantLeadership函数中创建出具体组件实例。

RestServer

RestServer并不是一个RPCServer,没有继承RpcGateway,只提供HTTP接口服务,然后将请求转交给Dispatcher处理,它的生成启动流程如下:

SessionRestEndpointFactory::createRestEndpoint
DispatcherRestEndpoint::new
RestServerEndpoint::start //通过Netty启动Rest服务
DispatcherRestEndpoint::initializeHandlers //JobSubmitHeaders、JobSubmitHandler处理客户端提交Job
WebMonitorEndpoint::initializeHandlers //关联Rest请求的Header和Handler
WebMonitorEndpoint::startInternal //竞选leader

ResourceManager

RM生成启动过程是ResourceManagerServiceImpl先竞选leader成功后再创建出具体的ResourceManager

ResourceManagerServiceImpl::start
ResourceManagerServiceImpl::grantLeadership
ResourceManagerServiceImpl::startNewLeaderResourceManager
ResourceManagerServiceImpl::startResourceManagerIfIsLeader//调用start方法
StandaloneResourceManagerFactory::createResourceManager
StandaloneResourceManager::new
StandaloneResourceManager::start

Dispatcher

Dispacher生成启动过程是DefaultDispatcherRunner选主后再创建出具体实例

DefaultDispatcherRunnerFactory::createDispatcherRunner
DefaultDispatcherRunner::create
DispatcherRunnerLeaderElectionLifecycleManager.createFor
DefaultDispatcherRunner::grantLeadership //
DefaultDispatcherRunner::startNewDispatcherLeaderProcess//创建SessionDispatcherLeaderProcess并调用其start方法
DefaultDispatcherRunner::createNewDispatcherLeaderProcess
SessionDispatcherLeaderProcessFactoryFactory::createFactory
SessionDispatcherLeaderProcessFactory::create
SessionDispatcherLeaderProcess::create
SessionDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::startInternal
SessionDispatcherLeaderProcess:onstart
SessionDispatcherLeaderProcess::createDispatcherIfRunning
SessionDispatcherLeaderProcess::createDispatcher
DefaultDispatcherGatewayServiceFactory::create//创建Dispatcher并调用其start方法
SessionDispatcherFactory::createDispatcher
StandaloneDispatcher::new
StandaloneDispatcher::start
Dispatcher::onstart

总结

在这里插入图片描述
JobManager的启动过程就是创建三大组件RestServer/RM/Dispacher实例初始化的过程,RestSever通过Netty启动HTTP服务,RM/Dispacher被AkkaRpcService包装成AkkaActor提供本地或远程RPC服务,RestServer仅仅是接受请求解析消息后由具体Handler处理,JobGrap提交执行会转发给Dispatcher处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/60250.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【cs61b】学习笔记day2

历史文章目录 【cs61b】学习笔记day1 文章目录 历史文章目录List两个小问题bits声明一个变量引用类型方框和指针表示法数组的实例化链表 SLList List 两个小问题 思考下面两个代码分别输出什么 Walrus a new Walrus(1000, 8.3); Walrus b; b a; b.weight 5; System.out.…

博客项目(Spring Boot)

1.需求分析 注册功能(添加用户操纵)登录功能(查询操作)我的文章列表页(查询我的文章|文章修改|文章详情|文章删除)博客编辑页(添加文章操作)所有人博客列表(带分页功能)…

[webpack] 基本配置 (一)

文章目录 1.基本介绍2.功能介绍3.简单使用3.1 文件目录和内容3.2 下载依赖3.3 启动webpack 4.基本配置4.1 五大核心概念4.2 基本使用 1.基本介绍 Webpack 是一个静态资源打包工具。它会以一个或多个文件作为打包的入口, 将我们整个项目所有文件编译组合成一个或多个文件输出出去…

Ubuntu 23.04 作为系统盘的体验和使用感受

1.为啥主系统装了Ubuntu 由于公司发电脑了,我自己也有一台台式电脑,然后也想去折腾一下Ubuntu,就把自己的笔记本装成Ubuntu系统了, 我使用的是23.04的桌面版,带图形化界面的。我准备换回Windows 11了(因为…

K8s中的Controller

Controller的作用 (1)确保预期的pod副本数量 (2)无状态应用部署 (3)有状态应用部署 (4)确保所有的node运行同一个pod,一次性任务和定时任务 1.无状态和有状态 无状态&…

java日期常用操作

Testpublic void validateDateUtils(){// 1 字符串转换日期Date result DateUtil.parse("2023-08-01", com.alibaba.excel.util.DateUtils.DATE_FORMAT_10);log.info("result : [{}]" , result);// 2 日期转换字符串final Date date new Date();String f…

文件数字水印,附一种纯文本隐写术数字水印方法

数字水印(Digital Watermark)是一种在数字媒体文件中嵌入隐藏信息的技术。这些数字媒体可以是图片、音频、视频或文本等。数字水印不会对原始文件造成明显的视觉或听觉变化,但可以在一定程度上保护知识产权,追踪数据来源&#xff…

CH-87——矿井水除氟的技术汇总

矿井水除氟的要求一般是处理后水中的含氟量≤1.0mg/L。氟化物含量高的原水往往呈偏碱性,pH值常大于7.5。利用阴离子交换树脂上的可交换阴离子,去交换水中的氟离子,达到除氟目的。氟离子的选择交换性较大,树脂上的SO42-、Cl-等阴离…

AttentionFreeTransformer 源码解析(一):AFTFull、AFTSimple、AFTLocal

我觉得源码写的很好懂,我就不加注释了,直接上计算流程图。 AFTFull class AFTFull(nn.Module):def __init__(self, max_seqlen, dim, hidden_dim64):super().__init__()max_seqlen: the maximum number of timesteps (sequence length) to be fed indim…

谷歌发布RT-2大模型,让机器人像人类那样思考

原创 | 文 BFT机器人 大语言模型是指基于深度学习技术的大规模预训练模型,它能够通过学习大量的文本数据来生成人类类似的语言表达,机器人可以通过对大量的语言数据进行学习,从中掌握人类的语言表达方式,进而能够更好地与人进行交…

java 企业工程管理系统软件源码 自主研发 工程行业适用 em

​ 工程项目管理软件(工程项目管理系统)对建设工程项目管理组织建设、项目策划决策、规划设计、施工建设到竣工交付、总结评估、运维运营,全过程、全方位的对项目进行综合管理 工程项目各模块及其功能点清单 一、系统管理 1、数据字典&#…

【UE4 RTS】06-Camera Edge Scroll

前言 本篇实现的效果是当玩家将鼠标移至屏幕边缘时,视野会相应的上下左右移动 效果 步骤 1. 打开玩家控制器“RTS_PlayerController_BP”,在类默认值中设置如下选项 新建一个宏,命名为“EdgeSroll”, 添加两个输入和三个输出&a…