diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 8201fe323b3e0..1a8911a0f9a59 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -18,11 +18,11 @@ package org.apache.spark.network.netty import java.nio.ByteBuffer +import java.util.{List => JList} import scala.collection.JavaConverters._ import scala.concurrent.{Future, Promise} import scala.reflect.ClassTag -import scala.tools.nsc.interpreter.JList import io.netty.buffer._ diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index afd13bc09b939..829ae7be2b7e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -239,7 +239,8 @@ class DAGScheduler( // (taskId, stageId, stageAttemptId, accumUpdates) accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])], blockManagerId: BlockManagerId): Boolean = { - listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, executorMetrics, accumUpdates)) + listenerBus.post( + SparkListenerExecutorMetricsUpdate(execId, accumUpdates, Some(executorMetrics))) blockManagerMaster.driverEndpoint.askSync[Boolean]( BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 7d03fe64925ac..c8e5ef1872456 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -89,8 +89,8 @@ private[spark] class EventLoggingListener( private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) private val executorIdToLatestMetrics = new HashMap[String, SparkListenerExecutorMetricsUpdate] - private val executorIdToModifiedMaxMetrics = new - HashMap[String, SparkListenerExecutorMetricsUpdate] + private val executorIdToModifiedMaxMetrics = + new HashMap[String, SparkListenerExecutorMetricsUpdate] /** * Creates the log file in the configured log directory. @@ -242,7 +242,7 @@ private[spark] class EventLoggingListener( // We only track the executor metrics in each stage, so we drop the task metrics as they are // quite verbose val eventWithoutTaskMetrics = SparkListenerExecutorMetricsUpdate( - event.execId, event.executorMetrics, Seq.empty) + event.execId, Seq.empty, event.executorMetrics) executorIdToLatestMetrics(eventWithoutTaskMetrics.execId) = eventWithoutTaskMetrics updateModifiedMetrics(eventWithoutTaskMetrics) } @@ -295,6 +295,9 @@ private[spark] class EventLoggingListener( * Does this event match the ID of an executor we are already tracking? * If no, start tracking metrics for this executor, starting at this event. * If yes, compare time stamps, and perhaps update using this event. + * Only do this if executorMetrics is present in the toBeModifiedEvent. + * If it is not - meaning we are processing historical data created + * without executorMetrics - simply cache the latestEvent * @param latestEvent the latest event received, used to update our map of stored metrics. */ private def updateModifiedMetrics(latestEvent: SparkListenerExecutorMetricsUpdate): Unit = { @@ -304,30 +307,36 @@ private[spark] class EventLoggingListener( case None => executorIdToModifiedMaxMetrics(executorId) = latestEvent case Some(toBeModifiedEvent) => - val toBeModifiedTransportMetrics = toBeModifiedEvent.executorMetrics.transportMetrics - val latestTransportMetrics = latestEvent.executorMetrics.transportMetrics - var timeStamp: Long = toBeModifiedTransportMetrics.timeStamp - - val onHeapSize = if - (latestTransportMetrics.onHeapSize > toBeModifiedTransportMetrics.onHeapSize) { - timeStamp = latestTransportMetrics.timeStamp - latestTransportMetrics.onHeapSize - } else { - toBeModifiedTransportMetrics.onHeapSize + if (toBeModifiedEvent.executorMetrics.isEmpty || + latestEvent.executorMetrics.isEmpty) { + executorIdToModifiedMaxMetrics(executorId) == latestEvent } - val offHeapSize = - if (latestTransportMetrics.offHeapSize > toBeModifiedTransportMetrics.offHeapSize) { - timeStamp = latestTransportMetrics.timeStamp - latestTransportMetrics.offHeapSize - } else { - toBeModifiedTransportMetrics.offHeapSize + else { + val prevTransportMetrics = toBeModifiedEvent.executorMetrics.get.transportMetrics + val latestTransportMetrics = latestEvent.executorMetrics.get.transportMetrics + var timeStamp: Long = prevTransportMetrics.timeStamp + + val onHeapSize = if + (latestTransportMetrics.onHeapSize > prevTransportMetrics.onHeapSize) { + timeStamp = latestTransportMetrics.timeStamp + latestTransportMetrics.onHeapSize + } else { + prevTransportMetrics.onHeapSize + } + val offHeapSize = + if (latestTransportMetrics.offHeapSize > prevTransportMetrics.offHeapSize) { + timeStamp = latestTransportMetrics.timeStamp + latestTransportMetrics.offHeapSize + } else { + prevTransportMetrics.offHeapSize + } + val updatedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.get.hostname, + toBeModifiedEvent.executorMetrics.get.port, + TransportMetrics(timeStamp, onHeapSize, offHeapSize)) + val modifiedEvent = SparkListenerExecutorMetricsUpdate( + toBeModifiedEvent.execId, toBeModifiedEvent.accumUpdates, Some(updatedExecMetrics)) + executorIdToModifiedMaxMetrics(executorId) = modifiedEvent } - val modifiedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.hostname, - toBeModifiedEvent.executorMetrics.port, - TransportMetrics(timeStamp, onHeapSize, offHeapSize)) - val modifiedEvent = SparkListenerExecutorMetricsUpdate( - toBeModifiedEvent.execId, modifiedExecMetrics, toBeModifiedEvent.accumUpdates) - executorIdToModifiedMaxMetrics(executorId) = modifiedEvent } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 8f9bd40af6d47..e8859aaf3e64f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -138,14 +138,14 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends /** * Periodic updates from executors. * @param execId executor id - * @param executorMetrics metrics in executor level * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorMetrics keeps track of TransportMetrics for an executor Added in Spark 2.3. */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, - executorMetrics: ExecutorMetrics, - accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) + accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], + executorMetrics: Option[ExecutorMetrics]) extends SparkListenerEvent @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala index 8bdd25c188afb..ce008d4c26cfe 100644 --- a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala @@ -57,16 +57,16 @@ class MemoryListener extends SparkListener { val executorMetrics = event.executorMetrics activeExecutorIdToMem .getOrElseUpdate(executorId, new MemoryUIInfo) - .updateMemUiInfo(executorMetrics) + .updateMemUiInfo(executorMetrics.get) activeStagesToMem.foreach { case (_, stageMemMetrics) => // If executor is added in the stage running time, we also update the metrics for the // executor in {{activeStagesToMem}} if (!stageMemMetrics.contains(executorId)) { stageMemMetrics(executorId) = new MemoryUIInfo } - stageMemMetrics(executorId).updateMemUiInfo(executorMetrics) + stageMemMetrics(executorId).updateMemUiInfo(executorMetrics.get) } - latestExecIdToExecMetrics(executorId) = executorMetrics + latestExecIdToExecMetrics(executorId) = executorMetrics.get } override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c970a5eb86578..3073584f28b55 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -237,9 +237,13 @@ private[spark] object JsonProtocol { val execId = metricsUpdate.execId val executorMetrics = metricsUpdate.executorMetrics val accumUpdates = metricsUpdate.accumUpdates + val metricsJson: JValue = executorMetrics match { + case Some(metrics) => executorMetricsToJson(metrics) + case None => "none" + } ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~ ("Executor ID" -> execId) ~ - ("Executor Metrics Updated" -> executorMetricsToJson(executorMetrics)) ~ + ("Executor Metrics Updated" -> metricsJson) ~ ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) => ("Task ID" -> taskId) ~ ("Stage ID" -> stageId) ~ @@ -688,7 +692,7 @@ private[spark] object JsonProtocol { (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson) (taskId, stageId, stageAttemptId, updates) } - SparkListenerExecutorMetricsUpdate(execInfo, executorMetrics, accumUpdates) + SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates, Some(executorMetrics)) } /** --------------------------------------------------------------------- * diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index feae76a087dec..ed6f89e93bc96 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -108,7 +108,14 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerBlockManagerAdded.apply"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.StorageStatus.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.RDDDataDistribution.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.RDDDataDistribution.this"), + + // [SPARK-9103] Update SparkListenerExecutorMetricsUpdate with new executorMetrics + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.this"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate$"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy$default$2") ) // Exclude rules for 2.1.x