【Spark精讲】RDD特性之数据本地化

首选运行位置

上图红框为RDD的特性五:每个RDD的每个分区都有一组首选运行位置,用于标识RDD的这个分区数据最好能够在哪台主机上运行。通过RDD的首选运行位置可以让RDD的某个分区的计算任务直接在指定的主机上运行,从而实现了移动计算而不是移动数据的目的减少了网络传输的开销,如Spark中HadoopRDD能够实现家装数据的任务在相应的数据节点上执行。

数据的本地化级别

package org.apache.spark.schedulerimport org.apache.spark.annotation.DeveloperApi@DeveloperApi
object TaskLocality extends Enumeration {// Process local is expected to be used ONLY within TaskSetManager for now.val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Valuetype TaskLocality = Valuedef isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {condition <= constraint}
}
  1. PROCESS_LOCAL:进程本地化,表示 task 要计算的数据在同一个 Executor 中。
  2. NODE_LOCAL:节点本地化,速度稍慢,因为数据需要在不同的进程之间传递或从文件中读取。分为两种情况,第一种:task 要计算的数据是在同一个 worker 的不同 Executor 进程中。第二种:task 要计算的数据是在同一个 worker 的磁盘上,或在 HDFS 上恰好有 block 在同一个节点上。如果 Spark 要计算的数据来源于 HDFS 上,那么最好的本地化级别就是 NODE_LOCAL。
  3. NO_PREF:没有最佳位置,数据从哪访问都一样快,不需要位置优先。比如 Spark SQL 从 Mysql 中读取数据。
  4. RACK_LOCAL:机架本地化,数据在同一机架的不同节点上。需要通过网络传输数据以及文件 IO,比 NODE_LOCAL 慢。情况一:task 计算的数据在 worker2 的 EXecutor 中。情况二:task 计算的数据在 work2 的磁盘上。
  5. ANY:跨机架,数据在非同一机架的网络上,速度最慢。

谁来负责数据本地化

val rdd1 = sc.textFile("hdfs://...") 
rdd1.cache()
rdd1.map.filter.count()

上面这段简单的代码,背后其实做什么很多事情。Driver 的 TaskScheduler 在发送 task 之前,首先应该拿到 rdd1 数据所在的位置,rdd1 封装了这个文件所对应的 block 的位置,DAGScheduler 通过调用 getPrerredLocations() 拿到 partition 所对应的数据的位置,TaskScheduler 根据这些位置来发送相应的 task。 

具体的解释:

DAGScheduler 切割Job,划分Stage, 通过调用 submitStage 来提交一个Stage 对应的 tasks,submitStage 会调用 submitMissingTasks, submitMissingTasks 确定每个需要计算的 task 的preferredLocations,通过调用 getPreferrdeLocations() 得到 partition 的优先位置,就是这个 partition 对应的 task 的优先位置,对于要提交到 TaskScheduler 的 TaskSet 中的每一个task,该 task 优先位置与其对应的 partition 对应的优先位置一致。

TaskScheduler 接收到了 TaskSet 后,TaskSchedulerImpl 会为每个 TaskSet 创建一个 TaskSetManager 对象,该对象包含taskSet 所有 tasks,并管理这些 tasks 的执行,其中就包括计算 TaskSetManager 中的 tasks 都有哪些 locality levels,以便在调度和延迟调度 tasks 时发挥作用。

总的来说,Spark 中的数据本地化是由 DAGScheduler 和 TaskScheduler 共同负责的。

数据本地化执行流程

第一步:PROCESS_LOCAL

TaskScheduler 根据数据的位置向数据节点发送 task 任务。如果这个任务在 worker1 的 Executor 中等待了 3 秒。(默认的,可以通过spark.locality.wait 来设置),可以通过 SparkConf() 来修改,重试了 5 次之后,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 PROCESS_LOCAL 降到 NODE_LOCAL。

第二步:NODE_LOCAL

TaskScheduler 重新发送 task 到 worker1 中的 Executor2 中执行,如果 task 在worker1 的 Executor2 中等待了 3 秒,重试了 5 次,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 NODE_LOCAL 降到 RACK_LOCAL。

第三步:RACK_LOCAL

TaskScheduler重新发送 task 到 worker2 中的 Executor1 中执行。

第四步:

当 task 分配完成之后,task 会通过所在的 worker 的 Executor 中的 BlockManager 来获取数据。如果 BlockManager 发现自己没有数据,那么它会调用 getRemote() 方法,通过 ConnectionManager 与原 task 所在节点的 BlockManager 中的 ConnectionManager先建立连接,然后通过TransferService(网络传输组件)获取数据,通过网络传输回task所在节点(这时候性能大幅下降,大量的网络IO占用资源),计算后的结果返回给Driver。这一步很像 shuffle 的文件寻址流程。

调优

TaskScheduler在发送task的时候,会根据数据所在的节点发送task,这时候的数据本地化的级别是最高的,如果这个task在这个Executor中等待了3秒,重试发射了5次还是依然无法执行,那么TaskScheduler就会认为这个Executor的计算资源满了,TaskScheduler会降低一级数据本地化的级别,重新发送task到其他的Executor中执行,如果还是依然无法执行,那么继续降低数据本地化的级别...

如果想让每一个 task 都能拿到最好的数据本地化级别,那么调优点就是等待时间加长。注意!如果过度调大等待时间,虽然为每一个 task 都拿到了最好的数据本地化级别,但是我们 job 执行的时间也会随之延长。

官方参数:Configuration - Spark 3.5.0 Documentation

spark.locality.wait3sHow long to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.0.5.0
spark.locality.wait.nodespark.locality.waitCustomize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information).0.8.0
spark.locality.wait.processspark.locality.waitCustomize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process.0.8.0
spark.locality.wait.rackspark.locality.waitCustomize the locality wait for rack locality.0.8.0

代码中的设置方法 

new SparkConf.set("spark.locality.wait","100") //默认3秒

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/269475.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

搜索引擎蜘蛛池的原理是什么,蜘蛛池如何搭建?

搜索引擎蜘蛛池是一种利用蜘蛛程序抓取互联网上的网页并返回相关数据的技术。它的原理主要是通过模拟搜索引擎蜘蛛的行为&#xff0c;将大量的网页链接提交给搜索引擎&#xff0c;从而增加网站在搜索引擎中的曝光率和排名。 如何联系蚂蚁seo&#xff1f; baidu搜索&#xff1…

Java - Spring中Bean的循环依赖问题

什么是Bean的循环依赖 A对象中有B属性。B对象中有A属性。这就是循环依赖。我依赖你&#xff0c;你也依赖我。 比如&#xff1a;丈夫类Husband&#xff0c;妻子类Wife。Husband中有Wife的引用。Wife中有Husband的引用。 Spring解决循环依赖的机理 Spring为什么可以解决set s…

Centos7 首次 安装Mysql8.0

随笔记录 背景介绍&#xff1a;重装Centos7 系统&#xff0c;没有安装mysql 目录 1. 查看否有MariaDB与MySQL 2. MySQL官网下载适用于centos7的mysql安装包 2.1 查询服务器是x86_64架构还是arm架构 2.2 查系统版本 2.3 下载适用于系统版本安装包 2.3.1 国内镜像源下载…

JVM面试

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1.JVM 的整体结构2.类加载做了哪些事情?类加载器有哪些&#xff1f;双亲委派和沙箱安全 3.Java虚拟机栈是什么4.方法区的理解HotSpot 中方法区的演进方法区的内部结…

12月12日作业

设计一个闹钟 头文件 #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QTimerEvent> #include <QTime> #include <QTime> #include <QTextToSpeech>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass …

【Unity学习笔记】光照简介

本节主要是简单介绍一些常见的光照组件和渲染设置。 文章目录 灯光类型平行光Directional Light点光源Point Light聚光灯Spot Light面积光 Area Light 阴影设置全局光照明光照模式直接光照与间接光照Mixed Lighting 光照探针Light Probe Group光照探针组 反射探针 灯光类型 在…

GeMap:Online Vectorized HD Map Construction using Geometry

参考代码&#xff1a;GeMap 动机与出发点 出了原本针对单点的L1损失&#xff0c;车道线具备的几何结构信息作为监督信息也可以再被挖掘挖掘&#xff0c;像车道线实例中点和点之间的距离与夹角、线与线之间的夹角、不同线上点与点之间的关系都可用来作为监督约束&#xff0c;但…

手持式心电图机|12道便携式心电图机主板方案定制

心电图机被广泛应用于心脏状况的监测&#xff0c;可以从多个角度观察心脏情况&#xff0c;及时反映患者的病情&#xff0c;以便医生和患者了解。触摸屏使得控制和信息录入变得轻松。心电图报告提供多种语言选择&#xff0c;便于上传信息&#xff0c;实现无纸化报告。同时&#…

Jemeter,提取响应体中的数据:正则表达式、Json提取器

一、正则表达式 1、线程组--创建线程组&#xff1b; 2、线程组--添加--取样器--HTTP请求&#xff1b; 3、Http请求--添加--后置处理器--正则表达式提取器&#xff1b; 4、线程组--添加--监听器--查看结果树&#xff1b; 5、线程组--添加--取样器--调试取样器。 响应体数据…

各地加速“双碳”落地,数字能源供应商怎么选?

作者 | 曾响铃 文 | 响铃说 随着我国力争2030年前实现“碳达峰”、2060年前实现“碳中和”的“双碳”目标提出&#xff0c;为各地区、各行业的低碳转型和绿色可持续发展制定“倒计时”时间表&#xff0c;一场围绕“数字能源”、“智慧能源”、“新能源”等关键词的创新探索进…

matlab 最小二乘拟合空间直线(方法三)

目录 一、算法原理1、算法过程2、参考文献二、代码实现三、结果展示四、相关链接博客长期更新,GPT与爬虫自重,你也未必能爬到最新版本。 一、算法原理 1、算法过程 空间直线的点向式方程为:

数据采集网关:工业数据采集上云

数据采集网关&#xff0c;以其高效、便捷的特点&#xff0c;成为了现代工业物联网数据采集处理的重要工具。它是连接不同数据源和数据接收设备的桥梁&#xff0c;将各种形式和格式的数据快速、安全地汇聚到一起。通过数据采集网关&#xff0c;企业可以轻松实现数据的整合、转换…