【大数据面试知识点】Spark的DAGScheduler

Spark数据本地化是在哪个阶段计算首选位置的?

先看一下DAGScheduler的注释,可以看到DAGScheduler除了Stage和Task的划分外,还做了缓存的跟踪和首选运行位置的计算。

DAGScheduler注释: 

The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run the job. It then submits stages as TaskSets to an underlying TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent tasks that can run right away based on the data that's already on the cluster (e.g. map output files from previous stages), though it may fail if this data becomes unavailable.
Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks in each stage, but operations with shuffle dependencies require multiple stages (one to write a set of map output files, and another to read those files after a barrier). In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the RDD.compute() functions of various RDDs
In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes these to the low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task a small number of times before cancelling the whole stage.
When looking through this code, there are several key concepts:

  • Jobs (represented by ActiveJob) are the top-level work items submitted to the scheduler. For example, when the user calls an action, like count(), a job will be submitted through submitJob. Each Job may require the execution of multiple stages to build intermediate data.
  • Stages (Stage) are sets of tasks that compute intermediate results in jobs, where each task computes the same function on partitions of the same RDD. Stages are separated at shuffle boundaries, which introduce a barrier (where we must wait for the previous stage to finish to fetch outputs). There are two types of stages: ResultStage, for the final stage that executes an action, and ShuffleMapStage, which writes map output files for a shuffle. Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
  • Tasks are individual units of work, each sent to one machine.
  • Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them and likewise remembers which shuffle map stages have already produced output files to avoid redoing the map side of a shuffle.
  • Preferred locations: the DAGScheduler also computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
  • Cleanup: all data structures are cleared when the running jobs that depend on them finish, to prevent memory leaks in a long-running application.

To recover from failures, the same stage might need to run multiple times, which are called "attempts". If the TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost stage(s) that compute the missing tasks. As part of this process, we might also have to create Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since tasks from the old attempt of a stage could still be running, care must be taken to map any events received in the correct Stage object.
Here's a checklist to use when making or reviewing changes to this class:

  • All data structures should be cleared when the jobs involving them end to avoid indefinite accumulation of state in long-running programs.
  • When adding a new data structure, update DAGSchedulerSuite.assertDataStructuresEmpty to include the new structure. This will help to catch memory leaks.

DAGScheduler的运行时机

DAGScheduler运行时机:Driver端初始化SparkContext时。DAGScheduler是在整个Spark Application的入口即 SparkContext中声明并实例化的。在实例化DAGScheduler之前,巳经实例化了SchedulerBackend和底层调度器 TaskScheduler。

如果是SQL任务的话,SparkSQL通过Catalyst(Spark SQL的核心是Catalyst优化器)将SQL先翻译成逻辑计划再翻译成物理计划,再转换成RDD的操作。之后运行时再通过DAGScheduler做RDD任务的划分和调度。

DAGScheduler如何划分Stage的?

用户提交的计算任务是一个由RDD依赖构成的DAG,Spark会把RDD的依赖以shuffle依赖为边界划分成多个Stage,这些Stage之间也相互依赖,形成了Stage的DAG。然后,DAGScheduler会按依赖关系顺序执行这些Stage。

要是把RDD依赖构成的DAG看成是逻辑执行计划(logic plan),那么,可以把Stage看成物理执行计划,为了更好的理解这个概念,我们来看一个例子。

下面的代码用来对README.md文件中包含整数值的单词进行计数,并打印RDD之间的依赖关系(Lineage):

scala> val counts = sc.textFile("README.md").flatMap(x=>x.split("\\W+")).filter(_.matches(".*\\d.*")).map(x=>(x,1)).reduceByKey(_+_)// 调用一个action函数,用来触发任务的提交和执行scala> counts.collect()​// 打印RDD的依赖关系(Lineage)scala> counts.toDebugStringres7: String =(2) ShuffledRDD[17] at reduceByKey at <console>:24 []+-(2) MapPartitionsRDD[16] at map at <console>:24 []|  MapPartitionsRDD[15] at filter at <console>:24 []|  MapPartitionsRDD[14] at flatMap at <console>:24 []|  README.md MapPartitionsRDD[13] at textFile at <console>:24 []|  README.md HadoopRDD[12] at textFile at <console>:24 []

DAGScheduler会根据Shuffle划分前后两个Stage:即StageShuffleMapStage和ResultStage

ShuffleMapStage

先看下ShuffleMapStage的注释,核心就是再讲ShuffleMapStage是做ShuffleWrite的Stage,Stage中是算子的pipline。

ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
 They occur right before each shuffle operation, and might contain multiple pipelined operations before that (e.g. map and filter). When executed, they save map output files that can later be fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of, and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
 
ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
 For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that there can be multiple ActiveJobs trying to compute the same shuffle map stage. 

ShuffleMapStages是在DAG执行过程中产生的Stage,用来为Shuffle产生数据。ShuffleMapStages发生在每个Shuffle操作之前,在Shuffle之前可能有多个窄转换操作,比如:map,filter,这些操作可以形成流水线(pipeline)。当执行ShuffleMapStages时,会产生Map的输出文件,这些文件会被随后的Reduce任务使用。

ShuffleMapStages也可以作为Jobs,通过DAGScheduler.submitMapStage函数单独进行提交。对于这样的Stages,会在变量mapStageJobs中跟踪提交它们的ActiveJobs。 要注意的是,可能有多个ActiveJob尝试计算相同的ShuffleMapStages。

它为一个shuffle过程产生map操作的输出文件。它也可能是自适应查询规划/自适应调度工作的最后阶段。

ResultStage

再看ResultStage的注释

ResultStages apply a function on some partitions of an RDD to compute the result of an action.
The ResultStage object captures the function to execute, `func`, which will be applied to each partition, and the set of partition IDs, `partitions`. Some stages may not run on all partitions of the RDD, for actions like first() and lookup().

ResultStage是Job的最后一个Stage,该Stage是基于执行action函数的rdd来创建的。该Stage用来计算一个action操作的结果。该类的声明如下:

 private[spark] class ResultStage(id: Int,rdd: RDD[_],val func: (TaskContext, Iterator[_]) => _,val partitions: Array[Int],parents: List[Stage],   //依赖的父StagefirstJobId: Int,callSite: CallSite)extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {

为了计算action操作的结果,ResultStage会在目标RDD的一个或多个分区上使用函数:func。需要计算的分区id集合保存在成员变量:partitions中。但对于有些action操作,比如:first(),take()等,函数:func可能不会在所有分区上使用。

另外,在提交Job时,会先创建ResultStage。但在提交Stage时,会先递归找到该Stage依赖的父级Stage,并先提交父级Stage。如下图所示:

举个例子:

思考题 

如下rdd运算,为什么最终只划分了3个Stage

scala> val rdd1 = sc.textFile("/root/tmp/a.txt",3).flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey((a,b)=>a+b)
val rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:1scala> val rdd2 = sc.textFile("/root/tmp/a.txt",3).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
val rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:1scala> val rdd3 = rdd1.join(rdd2)
val rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:1scala> val rdd4 = rdd3.groupByKey()
val rdd4: org.apache.spark.rdd.RDD[(String, Iterable[(Int, Int)])] = MapPartitionsRDD[13] at groupByKey at <console>:1scala> rdd4.collect().foreach(println)
(c,Seq((2,2)))                                                                  
(d,Seq((1,1)))
(a,Seq((2,2)))
(b,Seq((1,1)))scala> rdd4.toDebugString
val res8: String =
(3) MapPartitionsRDD[13] at groupByKey at <console>:1 []|  MapPartitionsRDD[12] at join at <console>:1 []|  MapPartitionsRDD[11] at join at <console>:1 []|  CoGroupedRDD[10] at join at <console>:1 []|  ShuffledRDD[4] at reduceByKey at <console>:1 []+-(3) MapPartitionsRDD[3] at map at <console>:1 []|  MapPartitionsRDD[2] at flatMap at <console>:1 []|  /root/tmp/a.txt MapPartitionsRDD[1] at textFile at <console>:1 []|  /root/tmp/a.txt HadoopRDD[0] at textFile at <console>:1 []|  ShuffledRDD[9] at reduceByKey at <console>:1 []+-(3) MapPartitionsRDD[8] at map at <console>:1 []|  MapPartitionsRDD[7] at flatMap at <console>:1 []|  /root/tmp/a.txt MapPartitionsRDD[6] at textFile at <console>:1 []|  /root/t...

参考:DAGScheduler-Stage的划分与提交 - 知乎Spark SQL 源码分析之Physical Plan 到 RDD的具体实现_physicalplan到rdd的具体实现-CSDN博客

一文搞定Spark的DAG调度器(DAGScheduler)_spark dagscheduler-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/314601.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

问界M9激光雷达解说

什么是激光雷达 激光雷达(英文:Lidar),是一种通过发射激光束来测量目标位置、速度等特征量的雷达系统。其工作原理是将激光光束照射到目标物体上,然后通过测量激光光束从发射到反射回来的时间,来计算目标物体的距离、位置、速度等参数。激光雷达通常用于测量地形、地貌、…

梳理Langchain-Chatchat-UI接口文档

在 Langchain-Chatchat v0.1.17 版本及以前是有前后端分离的 Vue 项目的&#xff0c;但是 v0.2.0 后就没有了。所以本文使用的是 Langchain-Chatchat v0.1.17 版本中的 Vue 项目。经过一番折腾终于将 Langchain-Chatchat v0.1.17 版本前端 Vue 接口和 Langchain-Chatchat v0.2.…

Github 2024-01-01 开源项目月报 Top20

根据Github Trendings的统计&#xff0c;本月(2024-01-01统计)共有20个项目上榜。根据开发语言中项目的数量&#xff0c;汇总情况如下&#xff1a; 开发语言项目数量Python项目5TypeScript项目3JavaScript项目3非开发语言项目2Java项目2HTML项目2Jupyter Notebook项目2PHP项目…

华为云默认安全组配置规则说明

华为云服务器默认安全组可选Sys-default、Sys-WebServer或Sys-FullAccess。default是默认安全组规则&#xff0c;只开放了22和3389端口&#xff1b;Sys-WebServer适用于Web网站开发场景&#xff0c;开放了80和443端口&#xff1b;Sys-FullAccess开放了全部端口。阿腾云atengyun…

Python算法例33 删除数字

1. 问题描述 给出一个字符串A&#xff0c;表示一个n位的正整数&#xff0c;删除其中k位数字&#xff0c;使得剩余的数字仍然按照原来的顺序排列产生一个新的正整数&#xff0c;本例将找到删除k个数字之后的最小正整数&#xff0c;其中n≤240&#xff0c;k≤n。 2. 问题示例 …

大数据机器学习GAN:生成对抗网络GAN全维度介绍与实战

文章目录 大数据机器学习GAN&#xff1a;生成对抗网络GAN全维度介绍与实战一、引言1.1 生成对抗网络简介1.2 应用领域概览1.3 GAN的重要性 二、理论基础2.1 生成对抗网络的工作原理2.1.1 生成器生成过程 2.1.2 判别器判别过程 2.1.3 训练过程训练代码示例 2.1.4 平衡与收敛 2.2…

TDD-LTE 寻呼流程

目录 1. 寻呼成功流程 1.1 空闲态寻呼 1.2 连接态寻呼 2. 寻呼失败流程 2.1 Paging消息不可达 2.2 RRC建立失败 2.3 eNodeB未上发Initial UE message或达到超时 1. 寻呼成功流程 1.1 空闲态寻呼 寻呼成功&#xff1a;MME发起寻呼&#xff08;S1 接口发送Paing 消息&…

国科大图像处理2023速通期末——汇总2017-2019

国科大2023.12.28图像处理0854期末重点 图像处理 王伟强 作业 课件 资料 一、填空 一个阴极射线管它的输入与输出满足 s r 2 sr^{2} sr2&#xff0c;这将使得显示系统产生比希望的效果更暗的图像&#xff0c;此时伽马校正通常在信号进入显示器前被进行预处理&#xff0c;令p…

HarmonyOS 组件通用属性之通用事件 文档参数讲解(触摸事件)

好 本文 我们来说说触摸事件 字面意思也非常好理解 就是我们手机手指触摸物体触发 我们先在编辑器组件介绍中 找到这个东西的基本用法 Button("跳转").onTouch((event: TouchEvent) > {})最明显的就是 event 的类型变了 点击事件的是 ClickEvent 而这里是 Touc…

ROS TF坐标变换 - 静态坐标变换

目录 一、静态坐标变换&#xff08;C实现&#xff09;二、静态坐标变换&#xff08;Python实现&#xff09; 如前文所属&#xff0c;ROS通过广播的形式告知各模块的位姿关系&#xff0c;接下来详述这一机制的代码实现。 模块间的位置关系有两种类型&#xff0c;一种是相对固定…

Prototype原型模式(创建对象)

原型模式&#xff1a;Prototype 链接&#xff1a;原型模式实例代码 注解 模式定义 使用原型实例指定创建对象的种类&#xff0c;然后通过拷贝这些原型来创建新的对象。 ——《设计模式》GoF 目的 在软件系统中&#xff0c;经常面临这“某些结构复杂的对象”的创建工作&am…

nginx源码分析-4

这一章内容讲述nginx的模块化。 ngx_module_t&#xff1a;一个结构体&#xff0c;用于描述nginx中的各个模块&#xff0c;其中包括核心模块、HTTP模块、事件模块等。这个结构体包含了一些模块的关键信息和回调函数&#xff0c;以便nginx在运行时能够正确地加载和管理这些模块。…