Skip to content

Commit

Permalink
Respond to some code review feedback. Make executorMetrics an Option.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoltren committed Apr 17, 2017
1 parent f3e5704 commit 577d442
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.spark.network.netty

import java.nio.ByteBuffer
import java.util.{List => JList}

import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
import scala.reflect.ClassTag
import scala.tools.nsc.interpreter.JList

import io.netty.buffer._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ class DAGScheduler(
// (taskId, stageId, stageAttemptId, accumUpdates)
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, executorMetrics, accumUpdates))
listenerBus.post(
SparkListenerExecutorMetricsUpdate(execId, accumUpdates, Some(executorMetrics)))
blockManagerMaster.driverEndpoint.askSync[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ private[spark] class EventLoggingListener(
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)

private val executorIdToLatestMetrics = new HashMap[String, SparkListenerExecutorMetricsUpdate]
private val executorIdToModifiedMaxMetrics = new
HashMap[String, SparkListenerExecutorMetricsUpdate]
private val executorIdToModifiedMaxMetrics =
new HashMap[String, SparkListenerExecutorMetricsUpdate]

/**
* Creates the log file in the configured log directory.
Expand Down Expand Up @@ -242,7 +242,7 @@ private[spark] class EventLoggingListener(
// We only track the executor metrics in each stage, so we drop the task metrics as they are
// quite verbose
val eventWithoutTaskMetrics = SparkListenerExecutorMetricsUpdate(
event.execId, event.executorMetrics, Seq.empty)
event.execId, Seq.empty, event.executorMetrics)
executorIdToLatestMetrics(eventWithoutTaskMetrics.execId) = eventWithoutTaskMetrics
updateModifiedMetrics(eventWithoutTaskMetrics)
}
Expand Down Expand Up @@ -295,6 +295,9 @@ private[spark] class EventLoggingListener(
* Does this event match the ID of an executor we are already tracking?
* If no, start tracking metrics for this executor, starting at this event.
* If yes, compare time stamps, and perhaps update using this event.
* Only do this if executorMetrics is present in the toBeModifiedEvent.
* If it is not - meaning we are processing historical data created
* without executorMetrics - simply cache the latestEvent
* @param latestEvent the latest event received, used to update our map of stored metrics.
*/
private def updateModifiedMetrics(latestEvent: SparkListenerExecutorMetricsUpdate): Unit = {
Expand All @@ -304,30 +307,36 @@ private[spark] class EventLoggingListener(
case None =>
executorIdToModifiedMaxMetrics(executorId) = latestEvent
case Some(toBeModifiedEvent) =>
val toBeModifiedTransportMetrics = toBeModifiedEvent.executorMetrics.transportMetrics
val latestTransportMetrics = latestEvent.executorMetrics.transportMetrics
var timeStamp: Long = toBeModifiedTransportMetrics.timeStamp

val onHeapSize = if
(latestTransportMetrics.onHeapSize > toBeModifiedTransportMetrics.onHeapSize) {
timeStamp = latestTransportMetrics.timeStamp
latestTransportMetrics.onHeapSize
} else {
toBeModifiedTransportMetrics.onHeapSize
if (toBeModifiedEvent.executorMetrics.isEmpty ||
latestEvent.executorMetrics.isEmpty) {
executorIdToModifiedMaxMetrics(executorId) == latestEvent
}
val offHeapSize =
if (latestTransportMetrics.offHeapSize > toBeModifiedTransportMetrics.offHeapSize) {
timeStamp = latestTransportMetrics.timeStamp
latestTransportMetrics.offHeapSize
} else {
toBeModifiedTransportMetrics.offHeapSize
else {
val prevTransportMetrics = toBeModifiedEvent.executorMetrics.get.transportMetrics
val latestTransportMetrics = latestEvent.executorMetrics.get.transportMetrics
var timeStamp: Long = prevTransportMetrics.timeStamp

val onHeapSize = if
(latestTransportMetrics.onHeapSize > prevTransportMetrics.onHeapSize) {
timeStamp = latestTransportMetrics.timeStamp
latestTransportMetrics.onHeapSize
} else {
prevTransportMetrics.onHeapSize
}
val offHeapSize =
if (latestTransportMetrics.offHeapSize > prevTransportMetrics.offHeapSize) {
timeStamp = latestTransportMetrics.timeStamp
latestTransportMetrics.offHeapSize
} else {
prevTransportMetrics.offHeapSize
}
val updatedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.get.hostname,
toBeModifiedEvent.executorMetrics.get.port,
TransportMetrics(timeStamp, onHeapSize, offHeapSize))
val modifiedEvent = SparkListenerExecutorMetricsUpdate(
toBeModifiedEvent.execId, toBeModifiedEvent.accumUpdates, Some(updatedExecMetrics))
executorIdToModifiedMaxMetrics(executorId) = modifiedEvent
}
val modifiedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.hostname,
toBeModifiedEvent.executorMetrics.port,
TransportMetrics(timeStamp, onHeapSize, offHeapSize))
val modifiedEvent = SparkListenerExecutorMetricsUpdate(
toBeModifiedEvent.execId, modifiedExecMetrics, toBeModifiedEvent.accumUpdates)
executorIdToModifiedMaxMetrics(executorId) = modifiedEvent
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
/**
* Periodic updates from executors.
* @param execId executor id
* @param executorMetrics metrics in executor level
* @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates)
* @param executorMetrics keeps track of TransportMetrics for an executor Added in Spark 2.3.
*/
@DeveloperApi
case class SparkListenerExecutorMetricsUpdate(
execId: String,
executorMetrics: ExecutorMetrics,
accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
executorMetrics: Option[ExecutorMetrics])
extends SparkListenerEvent

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ class MemoryListener extends SparkListener {
val executorMetrics = event.executorMetrics
activeExecutorIdToMem
.getOrElseUpdate(executorId, new MemoryUIInfo)
.updateMemUiInfo(executorMetrics)
.updateMemUiInfo(executorMetrics.get)
activeStagesToMem.foreach { case (_, stageMemMetrics) =>
// If executor is added in the stage running time, we also update the metrics for the
// executor in {{activeStagesToMem}}
if (!stageMemMetrics.contains(executorId)) {
stageMemMetrics(executorId) = new MemoryUIInfo
}
stageMemMetrics(executorId).updateMemUiInfo(executorMetrics)
stageMemMetrics(executorId).updateMemUiInfo(executorMetrics.get)
}
latestExecIdToExecMetrics(executorId) = executorMetrics
latestExecIdToExecMetrics(executorId) = executorMetrics.get
}

override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,13 @@ private[spark] object JsonProtocol {
val execId = metricsUpdate.execId
val executorMetrics = metricsUpdate.executorMetrics
val accumUpdates = metricsUpdate.accumUpdates
val metricsJson: JValue = executorMetrics match {
case Some(metrics) => executorMetricsToJson(metrics)
case None => "none"
}
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~
("Executor ID" -> execId) ~
("Executor Metrics Updated" -> executorMetricsToJson(executorMetrics)) ~
("Executor Metrics Updated" -> metricsJson) ~
("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
("Task ID" -> taskId) ~
("Stage ID" -> stageId) ~
Expand Down Expand Up @@ -688,7 +692,7 @@ private[spark] object JsonProtocol {
(json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson)
(taskId, stageId, stageAttemptId, updates)
}
SparkListenerExecutorMetricsUpdate(execInfo, executorMetrics, accumUpdates)
SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates, Some(executorMetrics))
}

/** --------------------------------------------------------------------- *
Expand Down
9 changes: 8 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,14 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerBlockManagerAdded.apply"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.StorageStatus.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.RDDDataDistribution.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.RDDDataDistribution.this"),

// [SPARK-9103] Update SparkListenerExecutorMetricsUpdate with new executorMetrics
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.apply"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.this"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate$"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy$default$2")
)

// Exclude rules for 2.1.x
Expand Down

0 comments on commit 577d442

Please sign in to comment.