diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 3ace2364ed8ab..36e66b821cc41 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -22,7 +22,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId import org.apache.spark.scheduler.TaskScheduler -case class Heartbeat( +private[spark] case class Heartbeat( executorId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId) @@ -31,7 +31,7 @@ case class Heartbeat( /** * Lives in the driver to receive heartbeats from executors.. */ -class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor { +private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor { override def receive = { case Heartbeat(executorId, taskMetrics, blockManagerId) => sender ! scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7e7efbb4ec182..43f754e522587 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -160,7 +160,9 @@ class DAGScheduler( * alive. Return true if the driver knows about the given block manager. Otherwise, return false, * indicating that the block manager should re-register. */ - def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, Int, TaskMetrics)], + def executorHeartbeatReceived( + execId: String, + taskMetrics: Array[(Long, Int, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = { listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics)) implicit val timeout = Timeout(30 seconds) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 4974ef988022b..d01d318633877 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -75,8 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId) @DeveloperApi case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent -case class SparkListenerExecutorMetricsUpdate(execId: String, - taskMetrics: Seq[(Long, Int, TaskMetrics)]) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerExecutorMetricsUpdate( + execId: String, + taskMetrics: Seq[(Long, Int, TaskMetrics)]) + extends SparkListenerEvent @DeveloperApi case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index eb95ec0064a65..6a7cfadd8b885 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -328,9 +328,11 @@ private[spark] class TaskSchedulerImpl( * alive. Return true if the driver knows about the given block manager. Otherwise, return false, * indicating that the block manager should re-register. */ - override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], + override def executorHeartbeatReceived( + execId: String, + taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = { - val metricsWithStageIds = taskMetrics.flatMap{ + val metricsWithStageIds = taskMetrics.flatMap { case (id, metrics) => { taskIdToTaskSetId.get(id) .flatMap(activeTaskSets.get) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 8c5ca3ec6b131..6abf6d930c155 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -21,8 +21,6 @@ import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState import org.apache.spark.util.{SerializableBuffer, Utils} -import org.apache.spark.executor.TaskMetrics -import org.apache.spark.storage.BlockManagerId private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 61add1d786051..0df6541f04369 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -113,7 +113,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { new StageUIData }) stageData.numActiveTasks += 1 - stageData.taskData.put(taskInfo.taskId, new UIData.TaskUIData(taskInfo)) + stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo)) } } @@ -160,14 +160,17 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { updateAggregateMetrics(stageData, info.executorId, metrics.get, oldMetrics) } - val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new UIData.TaskUIData(info)) + val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info)) taskData.taskInfo = info taskData.taskMetrics = metrics taskData.errorMessage = errorMessage } } - def updateAggregateMetrics(stageData: StageUIData, execId: String, taskMetrics: TaskMetrics, + def updateAggregateMetrics( + stageData: StageUIData, + execId: String, + taskMetrics: TaskMetrics, oldMetrics: Option[TaskMetrics]) { val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary) @@ -205,7 +208,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { new StageUIData }) val taskData = stageData.taskData.get(taskId) - taskData.map{ t => + taskData.map { t => if (!t.taskInfo.finished) { updateAggregateMetrics(stageData, executorMetricsUpdate.execId, taskMetrics, t.taskMetrics) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 48b99cf851620..ecd3b918e0997 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -55,7 +55,7 @@ private[jobs] object UIData { var executorSummary = new HashMap[String, ExecutorSummary] } - case class TaskUIData( + class TaskUIData( var taskInfo: TaskInfo, var taskMetrics: Option[TaskMetrics] = None, var errorMessage: Option[String] = None) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index ef25f82c5aeda..feafd654e9e71 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -142,8 +142,12 @@ private[spark] object AkkaUtils extends Logging { * Send a message to the given actor and get its result within a default timeout, or * throw a SparkException if this fails. */ - def askWithReply[T](message: Any, actor: ActorRef, retryAttempts: Int, - retryInterval: Int, timeout: FiniteDuration): T = { + def askWithReply[T]( + message: Any, + actor: ActorRef, + retryAttempts: Int, + retryInterval: Int, + timeout: FiniteDuration): T = { // TODO: Consider removing multiple attempts if (actor == null) { throw new SparkException("Error sending message as driverActor is null " +