Spark Shuffle Tracking 原理分析

Shuffle Tracking

Shuffle Tracking 是 Spark 在没有 ESS(External Shuffle Service)情况,并且开启 Dynamic Allocation 的重要功能。如在 K8S 上运行 spark 没有 ESS。本文档所有的前提都是基于以上条件的。

如果开启了 ESS,那么 Executor 计算完后,把 shuffle 数据交给 ESS, Executor 没有任务时,可以安全退出,下游任务从 ESS 拉取 shuffle 数据。

1. 背景

如果 Executor 执行了上游的 Shuffle Map Task 并且把 shuffle 数据些到本地。并且现在 Executor 没有 Task 运行,那么此 Executor 是否能销毁?

现状是如果 Executor 没有 active 的 shuffle 数据,则可以被销毁。
active shuffle 的定义:如果 Shuffle Map Stage 的 task 把 shuffle 数据输出到本地。如果依赖此 shuffle 的Stage 没有计算完毕,则称此 shuffle 为 active shuffle。因为依赖此 shuffle 的 Task 可能从 Driver 端获取了 MapStatus,但是还没有拉取完 shuffle 数据。

为了达到此目的,需要跟踪每个 Stage 和每个 Task 的运行信息。并且启动定时任务,定时扫描每个 Executor,判断是否有任务运行,是否有 active 的 shuffle,如果没有则可以退出。

退出有两种,如果开启了 decommission,则到期的 executors 进入 decommission 模式,否则执行 killExecutors。

参数配置

spark.dynamicAllocation.shuffleTracking.enabled: 默认 true,是否开启 shuffle tracking。
spark.dynamicAllocation.shuffleTracking.timeout: 默认 Long.MaxValue,

2. 设计

ExecutorMonitor 为每个 Executor 创建一个 Tracker, 用于跟踪此 Executor 的状态。

private val executors = new ConcurrentHashMap[String, Tracker]()

定时任务间隔时间查找 timeout 的 executor,然后处理。

timedOutExecutors 方法的主要逻辑,就是遍历 executors。如果 executor 没有 active 的 shuffle 并且当前时间大于 executor 的超时时间 timeoutAt,则此 executor 可以被安全释放。

为什么 executor 有 active shuffle 数据就不能 kill?
在这里插入图片描述

  • Shuffle 的过程:
  1. MapTask 把 shuffle 写到本地,并且把状态汇报给 Driver.
  2. Reduce Task 从 Driver 获取 shuffle status,并从 shuffle status 获取每个 shuffle 数据的地址。
  3. 连接对应的 executor 获取 shuffle 数据。

如果在 reduce 获取完 shuffle status 后,MapTask 所在的 Executor 被 kill 掉,Reduce Task 就无法获取 shuffle 数据。

如果执行 decommission 逻辑,把 MapTask 的 shuffle 数据长传到 bos 等分布式存储是否可以?

也是不可以的,因为 reduce 可能已经把 shuffle status 拿走,获取的 shuffle status 没有记录 shuffle 数据在分布式存储上。

参考: ExecutorMonitor,ExecutorAllocationManager

Executor 状态的更新

ExecutorMonitor 实现了 SparkListner 接口,当 Job, Stage, Task 等 start 和 end 时,都会执行回调。

以 hasActiveShuffle 为例
每个 executor 用一个集合 shuffleIds 存储其上拥有的 shuffle 数据。 当其为空时,说明没有 shuffle 数据。

在 onTaskEnd 和 onBlockUpdated 时调用 addShuffle 向 shuffleIds 添加数据。

在以下时机删除 shuffleIds 里的数据。

  1. 依赖 driver 端的 ContextCleaner,当 ShuffleRDD 仅有 weakReference 时触发。
  2. rdd.cleanShuffleDependencies 方法,但是此方法仅在 org.apache.spark.ml.recommendation.ALS 使用。

timeoutAt 的计算逻辑

总结:timeoutAt 根据 idle 的时间,spark.dynamicAllocation.cachedExecutorIdleTimeout 和 spark.dynamicAllocation.shuffleTracking.timeout 这 3 个值中最大的值。

详细计算逻辑:
timeoutAt 在一些事件发生时触发计算,如 onBlockUpdated, onUnpersistRDD, updateRunningTasks, removeShuffle, updateActiveShuffles
timeoutAt 的计算逻辑:
当执行器有计算任务时 为 Long.MaxValue。
否则为 max(_cacheTimeout, _shuffleTimeout, idleTimeoutNs)
_cacheTimeout: 如果没有 cache 数据,为0,否则为参数 spark.dynamicAllocation.cachedExecutorIdleTimeout 的值(默认 Long.MaxValue)。

_shuffleTimeout: 如果没有 shuffle数据,为 0, 否则为参数 spark.dynamicAllocation.shuffleTracking.timeout 的值(默认 Long.MaxValue)。
idleTimeoutNs 为 spark.dynamicAllocation.executorIdleTimeout

3. 测试

测试命令

spark-shell  \--conf spark.dynamicAllocation.enabled=true \--conf spark.dynamicAllocation.initialExecutors=2 \--conf spark.dynamicAllocation.maxExecutor=400 \--conf spark.dynamicAllocation.minExecutors=1 \--conf spark.shuffle.service.enabled=false \--conf spark.dynamicAllocation.shuffleTracking.enabled=true

参考资料:

https://www.waitingforcode.com/apache-spark/what-new-apache-spark-3-shuffle-service-changes/read

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/503822.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

List<Object>集合对象属性拷贝工具类

目录 问题现象: 问题分析: 解决方法: 问题现象: 最近在项目中经常会使用到BeanUtils工具类来作对象的属性字段拷贝,但如果应用到List集合的话就需要遍历去操作了,如下: 打印结果: …

水经微图Web版1.6.0发布

让每一个人都有自己的地图! 水经微图(简称“微图”)新版已上线,在该版本中主要新增了点线面图层分组样式设置、图层排序并按序绘制、KML支持矢量符号的存储、KML支持态势标绘要素存储和新增历史地图文本样式等。 现在&#xff0…

2024年2月最新微信域名检测拦截接口源码

这段PHP代码用于检测指定域名列表中的域名是否被封。代码首先定义了一个包含待检测域名的数组 $domainList,然后遍历该数组,对每个域名发送HTTP请求并检查响应内容以判断域名是否被封。 具体步骤如下: 1. 定义待检测的域名列表。 2. 遍历域名…

Python3零基础教程之字符串专题初阶

大家好,我是千与编程,上一期我们讲解了Python3编程语言中的数组与列表专题。这一期我们讲解了字符串专题初阶。 在本初阶教程中,我们将探索 Python3 中字符串专题的基础,包括字符串的输入输出、定义、连接和重复。这些概念对于理解…

langchain学习笔记(七)

RunnablePassthrough: Passing data through | 🦜️🔗 Langchain 1、RunnablePassthrough可以在不改变或添加额外键的情况下传递输入。通常和RunnableParallel结合使用去分配数值给到字典的新键 两种方式调用RunnablePassthrough (1&#…

思维题(蓝桥杯 填空题 C++)

目录 题目一: ​编辑 代码: 题目二: 代码: 题目三: 代码: 题目四: 代码: 题目五: 代码: 题目六: 代码七: 题目八&#x…

MySQL数据库下载及安装教程

MySQL数据库下载及安装教程 一、MySQL数据库下载及安装教程1.MySQL数据库下载1.1 MySQL官网1.2 MySQL官网下载页(表面上的)1.3 MySQL官网下载页(真正的下载地址)1.4 下载教程 2.MySQL数据库安装教程2.1 MySQL数据库安装版配置安装…

通过SMI(MDC/MDIO)读取外部PHY寄存器

一、基础介绍: SMI:串行管理接口(Serial Management Interface),也被称作MII管理接口(MII Management Interface),包括MDC和MDIO两条信号线。 MDIO是一个PHY的管理接口&#xff0c…

微信小程序云开发教程——墨刀原型工具入门(Axure导入)

引言 作为一个小白,小北要怎么在短时间内快速学会微信小程序原型设计? “时间紧,任务重”,这意味着学习时必须把握微信小程序原型设计中的重点、难点,而非面面俱到。 要在短时间内理解、掌握一个工具的使用&#xf…

【计算机网络】TCP 如何实现可靠传输

TCP通过三次握手建立连接,四次挥手释放连接,确保连接建立和连接释放的可靠。 序列号、检验和、确认应答信号、重发机制、连接管理、窗口控制、流量控制、拥塞控制 标准回答 可靠传输就是通过TCP连接传送的数据是没有差错、不会丢失、不重复并且按序到达的…

微服务间通信重构与服务治理笔记

父工程 依赖版本管理,但实际不引入依赖 pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation&…

【Java程序员面试专栏 算法思维】六 高频面试算法题:动态规划

一轮的算法训练完成后,对相关的题目有了一个初步理解了,接下来进行专题训练,以下这些题目就是汇总的高频题目,本篇主要聊聊回溯算法,主要就是排列组合问题,所以放到一篇Blog中集中练习 题目关键字解题思路时间空间零钱兑换动态规划+双重循环dp[i]表示兑换金额为i元的最少…