From 754b5b8cefd953e10c78519ba4c6d925c505bc6c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 9 Sep 2014 21:49:49 -0700 Subject: [PATCH] copy taskMetrics only when isLocal is true --- .../org/apache/spark/executor/Executor.scala | 8 +++++++- .../spark/ui/jobs/JobProgressListener.scala | 6 ++---- .../ui/jobs/JobProgressListenerSuite.scala | 19 +++++++------------ 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index dd903dc65d204..afba6d8e5f49e 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -360,7 +360,13 @@ private[spark] class Executor( if (!taskRunner.attemptedTask.isEmpty) { Option(taskRunner.task).flatMap(_.metrics).foreach { metrics => metrics.updateShuffleReadMetrics - tasksMetrics += ((taskRunner.taskId, metrics)) + if (isLocal) { + // make a deep copy of it + val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics)) + tasksMetrics += ((taskRunner.taskId, copiedMetrics)) + } else { + tasksMetrics += ((taskRunner.taskId, metrics)) + } } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 62a14379b33fa..eaeb861f59e5a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -26,7 +26,6 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId import org.apache.spark.ui.jobs.UIData._ -import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -242,9 +241,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { updateAggregateMetrics(stageData, executorMetricsUpdate.execId, taskMetrics, t.taskMetrics) - // Overwrite task metrics with deepcopy - // TODO: only serialize it in local mode - t.taskMetrics = Some(Utils.deserialize[TaskMetrics](Utils.serialize(taskMetrics))) + // Overwrite task metrics + t.taskMetrics = Some(taskMetrics) } } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 74a98b26d1435..3370dd4156c3f 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -144,7 +144,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val taskType = Utils.getFormattedClassName(new ShuffleMapTask(0)) val execId = "exe-1" - def updateTaskMetrics(taskMetrics: TaskMetrics, base: Int) = { + def makeTaskMetrics(base: Int) = { + val taskMetrics = new TaskMetrics() val shuffleReadMetrics = new ShuffleReadMetrics() val shuffleWriteMetrics = new ShuffleWriteMetrics() taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics)) @@ -173,16 +174,10 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L))) listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L))) - val metrics4 = new TaskMetrics - val metrics5 = new TaskMetrics - val metrics6 = new TaskMetrics - val metrics7 = new TaskMetrics listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( - (1234L, 0, 0, updateTaskMetrics(metrics4, 0))))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( - (1235L, 0, 0, updateTaskMetrics(metrics5, 100))))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( - (1236L, 1, 0, updateTaskMetrics(metrics6, 200))))) + (1234L, 0, 0, makeTaskMetrics(0)), + (1235L, 0, 0, makeTaskMetrics(100)), + (1236L, 1, 0, makeTaskMetrics(200))))) var stage0Data = listener.stageIdToData.get((0, 0)).get var stage1Data = listener.stageIdToData.get((1, 0)).get @@ -207,10 +202,10 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc // task that was included in a heartbeat listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1), - updateTaskMetrics(metrics4, 300))) + makeTaskMetrics(300))) // task that wasn't included in a heartbeat listener.onTaskEnd(SparkListenerTaskEnd(1, 0, taskType, Success, makeTaskInfo(1237L, 1), - updateTaskMetrics(metrics7, 400))) + makeTaskMetrics(400))) stage0Data = listener.stageIdToData.get((0, 0)).get stage1Data = listener.stageIdToData.get((1, 0)).get