From 5ca26dcd7e007d0cfb11ab920e98fb46cc9343d7 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 9 Sep 2014 15:36:04 -0700 Subject: [PATCH] fix task metrics aggregation in local mode --- .../spark/ui/jobs/JobProgressListener.scala | 6 ++++-- .../ui/jobs/JobProgressListenerSuite.scala | 19 ++++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index eaeb861f59e5a..62a14379b33fa 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -26,6 +26,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId import org.apache.spark.ui.jobs.UIData._ +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -241,8 +242,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { updateAggregateMetrics(stageData, executorMetricsUpdate.execId, taskMetrics, t.taskMetrics) - // Overwrite task metrics - t.taskMetrics = Some(taskMetrics) + // Overwrite task metrics with deepcopy + // TODO: only serialize it in local mode + t.taskMetrics = Some(Utils.deserialize[TaskMetrics](Utils.serialize(taskMetrics))) } } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 3370dd4156c3f..74a98b26d1435 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -144,8 +144,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val taskType = Utils.getFormattedClassName(new ShuffleMapTask(0)) val execId = "exe-1" - def makeTaskMetrics(base: Int) = { - val taskMetrics = new TaskMetrics() + def updateTaskMetrics(taskMetrics: TaskMetrics, base: Int) = { val shuffleReadMetrics = new ShuffleReadMetrics() val shuffleWriteMetrics = new ShuffleWriteMetrics() taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics)) @@ -174,10 +173,16 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L))) listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L))) + val metrics4 = new TaskMetrics + val metrics5 = new TaskMetrics + val metrics6 = new TaskMetrics + val metrics7 = new TaskMetrics listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( - (1234L, 0, 0, makeTaskMetrics(0)), - (1235L, 0, 0, makeTaskMetrics(100)), - (1236L, 1, 0, makeTaskMetrics(200))))) + (1234L, 0, 0, updateTaskMetrics(metrics4, 0))))) + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( + (1235L, 0, 0, updateTaskMetrics(metrics5, 100))))) + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( + (1236L, 1, 0, updateTaskMetrics(metrics6, 200))))) var stage0Data = listener.stageIdToData.get((0, 0)).get var stage1Data = listener.stageIdToData.get((1, 0)).get @@ -202,10 +207,10 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc // task that was included in a heartbeat listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1), - makeTaskMetrics(300))) + updateTaskMetrics(metrics4, 300))) // task that wasn't included in a heartbeat listener.onTaskEnd(SparkListenerTaskEnd(1, 0, taskType, Success, makeTaskInfo(1237L, 1), - makeTaskMetrics(400))) + updateTaskMetrics(metrics7, 400))) stage0Data = listener.stageIdToData.get((0, 0)).get stage1Data = listener.stageIdToData.get((1, 0)).get