Flink Job 执行流程

Flink On Yarn 模式

在这里插入图片描述

基于Yarn层面的架构类似 Spark on Yarn模式,都是由Client提交AppRM上面去运行,然后 RM分配第一个container去运行AM,然后由AM去负责资源的监督和管理。需要说明的是,FlinkYarn模式更加类似Spark on Yarncluster模式,在cluster模式中,dirver将作为AM中的一个线程去运行。Flink on Yarn模式也是会将JobManager启动在container里面,去做个driver类似的任务调度和分配,Yarn AMFlink JobManager在同一个Container,这样AM可以知道Flink JobManager的地址,从而AM可以申请Container去启动Flink TaskManager。待Flink成功运行在Yarn集群上,Flink Yarn Client就可以提交Flink JobFlink JobManager,并进行后续的映射、调度和计算处理。

Fink on Yarn 的缺陷

【1】资源分配是静态的,一个作业需要在启动时获取所需的资源并且在它的生命周期里一直持有这些资源。这导致了作业不能随负载变化而动态调整,在负载下降时无法归还空闲的资源,在负载上升时也无法动态扩展。
【2】On-Yarn模式下,所有的container都是固定大小的,导致无法根据作业需求来调整container的结构。譬如CPU密集的作业或需要更多的核,但不需要太多内存,固定结构的container会导致内存被浪费。
【3】与容器管理基础设施的交互比较笨拙,需要两个步骤来启动Flink作业:1.启动Flink守护进程;2.提交作业。如果作业被容器化并且将作业部署作为容器部署的一部分,那么将不再需要步骤2。
【4】On-Yarn模式下,作业管理页面会在作业完成后消失不可访问。
【5】Flink推荐 per job clusters 的部署方式,但是又支持可以在一个集群上运行多个作业的session模式,令人疑惑。

Flink版本1.5中引入了DispatcherDispatcher是在新设计里引入的一个新概念。Dispatcher会从Client端接受作业提交请求并代表它在集群管理器上启动作业。引入Dispatcher的原因主要有两点:
【1】一些集群管理器需要一个中心化的作业生成和监控实例;
【2】能够实现Standalone模式下JobManager的角色,且等待作业提交。在一些案例中,Dispatcher是可选的Yarn或者不兼容的kubernetes

资源调度模型重构下的 Flink On Yarn 模式

[点击并拖拽以移动] ​

客户端提交JobGraph以及依赖jar包到YarnResourceManager,接着Yarn ResourceManager分配第一个container以此来启动AppMasterApplication Master中会启动一个FlinkResourceManager以及JobManagerJobManager会根据JobGraph生成的ExecutionGraph以及物理执行计划向FlinkResourceManager申请slotFlinkResoourceManager会管理这些slot以及请求,如果没有可用slot就向YarnResourceManager申请containercontainer启动以后会注册到FlinkResourceManager,最后JobManager会将subTask deploy到对应containerslot中去。
[点击并拖拽以移动] ​

在有Dispatcher的模式下:会增加一个过程,就是Client会直接通过HTTP Server的方式,然后用Dispatcher将这个任务提交到Yarn ResourceManager中。

新框架具有四大优势,详情如下:
【1】client直接在Yarn上启动作业,而不需要先启动一个集群然后再提交作业到集群。因此client再提交作业后可以马上返回。
【2】所有的用户依赖库和配置文件都被直接放在应用的classpath,而不是用动态的用户代码classloader去加载。
【3】container在需要时才请求,不再使用时会被释放。
【4】“需要时申请”的container分配方式允许不同算子使用不同profile (CPU和内存结构)的container

新的资源调度框架下 single cluster job on Yarn 流程介绍

[点击并拖拽以移动] ​

single cluster job on Yarn模式涉及三个实例对象:
【1】clifrontend Invoke App code;生成StreamGraph,然后转化为JobGraph
【2】YarnJobClusterEntrypoint(Master) 依次启动YarnResourceManagerMinDispatcherJobManagerRunner三者都服从分布式协同一致的策略;JobManagerRunnerJobGraph转化为ExecutionGraph,然后转化为物理执行任务Execution,然后进行deploydeploy过程会向 YarnResourceManager请求slot,如果有直接deploy到对应的YarnTaskExecutiontorslot里面,没有则向YarnResourceManager申请,带container启动以后deploy
【3】YarnTaskExecutorRunner (slave) 负责接收subTask,并运行。

整个任务运行代码调用流程如下图

[点击并拖拽以移动] ​

subTask在执行时是怎么运行的?

调用StreamTaskinvoke方法,执行步骤如下:
【1】initializeState()operatorinitializeState()
【2】openAllOperators()operatoropen()方法;
【3】最后调用run方法来进行真正的任务处理;

我们来看下flatMap对应的OneInputStreamTaskrun方法具体是怎么处理的。

@Override
protected void run() throws Exception {// 在堆栈上缓存处理器引用,使代码更易于JITfinal StreamInputProcessor<IN> inputProcessor = this.inputProcessor;while (running && inputProcessor.processInput()) {// 所有的工作都发生在“processInput”方法中}
}

最终是调用StreamInputProcessorprocessInput()做数据的处理,这里面包含用户的处理逻辑。

public boolean processInput() throws Exception {if (isFinished) {return false;}if (numRecordsIn == null) {try {numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();} catch (Exception e) {LOG.warn("An exception occurred during the metrics setup.", e);numRecordsIn = new SimpleCounter();}}while (true) {if (currentRecordDeserializer != null) {DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);if (result.isBufferConsumed()) {currentRecordDeserializer.getCurrentBuffer().recycleBuffer();currentRecordDeserializer = null;}if (result.isFullRecord()) {StreamElement recordOrMark = deserializationDelegate.getInstance();//处理watermarkif (recordOrMark.isWatermark()) {// handle watermark//watermark处理逻辑,这里可能引起timer的triggerstatusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);continue;} else if (recordOrMark.isStreamStatus()) {// handle stream statusstatusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);continue;//处理latency watermark} else if (recordOrMark.isLatencyMarker()) {// handle latency markersynchronized (lock) {streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());}continue;} else {//用户的真正的代码逻辑// now we can do the actual processingStreamRecord<IN> record = recordOrMark.asRecord();synchronized (lock) {numRecordsIn.inc();streamOperator.setKeyContextElement1(record);//处理数据streamOperator.processElement(record);}return true;}}}//这里会进行checkpoint barrier的判断和对齐,以及不同partition 里面checkpoint barrier不一致时候的,数据buffer,final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();if (bufferOrEvent != null) {if (bufferOrEvent.isBuffer()) {currentChannel = bufferOrEvent.getChannelIndex();currentRecordDeserializer = recordDeserializers[currentChannel];currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());}else {// Event receivedfinal AbstractEvent event = bufferOrEvent.getEvent();if (event.getClass() != EndOfPartitionEvent.class) {throw new IOException("Unexpected event: " + event);}}}else {isFinished = true;if (!barrierHandler.isEmpty()) {throw new IllegalStateException("Trailing data in checkpoint barrier handler.");}return false;}}
}

streamOperator.processElement(record)最终会调用用户的代码处理逻辑,假如operatorStreamFlatMap的话。

@Override
public void processElement(StreamRecord<IN> element) throws Exception {collector.setTimestamp(element);userFunction.flatMap(element.getValue(), collector);//用户代码
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/312815.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据资产入表之——数据确权

关注WX公众号&#xff1a; commindtech77&#xff0c; 获得数据资产相关白皮书下载地址 1. 回复关键字&#xff1a;数据资源入表白皮书 下载 《2023数据资源入表白皮书》 2. 回复关键字&#xff1a;光大银行 下载 光大银行-《商业银行数据资产会计核算研究报告》 3. 回复关键字…

一文掌握Java注解之@SpringBootApplication知识文集(1)

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

【sql】MyBatis Plus中,sql报错LIKE “%?%“:

文章目录 一、报错详情&#xff1a;二、解决&#xff1a;三、扩展&#xff1a; 一、报错详情&#xff1a; 二、解决&#xff1a; 将LIKE “%”#{xxx}"%"改为LIKE CONCAT(‘%’, #{xxx}, ‘%’) 三、扩展&#xff1a; MyBatis Plus之like模糊查询中包含有特殊字符…

CycleGAN 是如何工作的?

一、说明 CycleGAN即循环对抗网络&#xff0c;是图像翻译成图像的模型&#xff1b;是Pix2Pix模型的扩展&#xff0c;区别在于&#xff0c;Pix2Pix模型需要输入图像和目标图像成对给出训练&#xff0c;CycleGAN则不需要&#xff0c;例如&#xff1a;从 SAR 生成 RGB 图像、从 RG…

在STM32上使用DMA进行UART通信

本文将介绍如何在STM32上使用DMA&#xff08;Direct Memory Access&#xff09;进行UART通信&#xff0c;以提高数据传输效率。我们将介绍STM32的DMA和UART模块的基本概念和使用方法&#xff0c;并给出相关的示例代码和注意事项。DMA&#xff08;Direct Memory Access&#xff…

Spark中的数据加载与保存

Apache Spark是一个强大的分布式计算框架&#xff0c;用于处理大规模数据。在Spark中&#xff0c;数据加载与保存是数据处理流程的关键步骤之一。本文将深入探讨Spark中数据加载与保存的基本概念和常见操作&#xff0c;包括加载不同数据源、保存数据到不同格式以及性能优化等方…

Python pycharm编辑器修改代码字体

在pycharm编辑器下修改代码字体&#xff0c;可以按照以下步骤&#xff1a; 点开上图所示的菜单&#xff0c; 再点击File->Settings&#xff0c;进入设置页面。 我们找到Editor下的Font并点选&#xff0c;然后我们就可以在右侧修改字体相关配置了。 这里建议使用等宽字体&…

C++面试宝典第13题:计算餐厅账单

题目 假如你是一家餐厅的收银员,需要编写一个程序来计算顾客的账单。程序应该能够接受顾客点的菜品和数量,并根据菜品的单价计算出总价。另外,程序还应该能够处理折扣和优惠券,并输出最终的账单金额。 解析 这道题主要考察应聘者使用面向对象的设计方法来解决实际问题的能力…

2023年“中银杯”四川省职业院校技能大赛“云计算应用”赛项样题卷②

2023年“中银杯”四川省职业院校技能大赛“云计算应用”赛项&#xff08;高职组&#xff09; 样题&#xff08;第2套&#xff09; 目录 2023年“中银杯”四川省职业院校技能大赛“云计算应用”赛项&#xff08;高职组&#xff09; 样题&#xff08;第2套&#xff09; 模块…

【数据结构】栈和队列(队列的基本操作和基础知识)

&#x1f308;个人主页&#xff1a;秦jh__https://blog.csdn.net/qinjh_?spm1010.2135.3001.5343&#x1f525; 系列专栏&#xff1a;《数据结构》https://blog.csdn.net/qinjh_/category_12536791.html?spm1001.2014.3001.5482 ​ 目录 前言 队列 队列的概念和结构 队列的…

分布式IO在工业自动化中的应用

传统的自动化产线及物流系统主要是利用PLC来处理数据&#xff0c;并将这些数据保存在PC当中。但是随着互联网技术的迅速发展&#xff0c;越来越多的系统集成商利用分布式IO模块&#xff0c;实现从控制器到自动化最底层之间的IO通信。 分布式IO在工业自动化中的应用 分布式IO是用…

详解Vue3中的鼠标事件mousemove、mouseover和mouseout

本文主要介绍Vue3中的常见鼠标事件mousemove、mouseover和mouseout。 目录 一、mousemove——鼠标移动事件二、mouseover——鼠标移入事件三、mouseout——鼠标移出事件 下面是Vue 3中常用的鼠标事件mousemove、mouseover和mouseout的详解。 一、mousemove——鼠标移动事件 鼠…