ShuffleExchangeExec
private lazy val writeMetrics =SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)private[sql] lazy val readMetrics =SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
用在了两个地方,承接的是前后两个stage 的metrics
/*** A [[ShuffleDependency]] that will partition rows of its child based on* the partitioning scheme defined in `newPartitioning`. Those partitions of* the returned ShuffleDependency will be the input of shuffle.*/@transientlazy val shuffleDependency : ShuffleDependency[Int, InternalRow, InternalRow] = {val dep = ShuffleExchangeExec.prepareShuffleDependency(inputRDD,child.output,outputPartitioning,serializer,writeMetrics)metrics("numPartitions").set(dep.partitioner.numPartitions)val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics("numPartitions") :: Nil)dep}
protected override def doExecute(): RDD[InternalRow] = {// Returns the same ShuffleRowRDD if this plan is used by multiple plans.if (cachedShuffleRDD == null) {cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics)}cachedShuffleRDD}
一般情况是,两个metrics 相同。 write 在前,read 在后
如果下个shuffle read task 没有完成或者失败,就会出现read 比write 少的情况。
broadcast
/** Remove all blocks belonging to the given broadcast. */def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean): Unit = {val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveBroadcast(broadcastId, removeFromMaster))future.failed.foreach(e =>logWarning(s"Failed to remove broadcast $broadcastId" +s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e))(ThreadUtils.sameThread)if (blocking) {// the underlying Futures will timeout anyway, so it's safe to use infinite timeout hereRpcUtils.INFINITE_TIMEOUT.awaitResult(future)}}