Skip to content

Commit

Permalink
fix task metrics aggregation in local mode
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Sep 9, 2014
1 parent f0f1ba0 commit 5ca26dc
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -241,8 +242,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
updateAggregateMetrics(stageData, executorMetricsUpdate.execId, taskMetrics,
t.taskMetrics)

// Overwrite task metrics
t.taskMetrics = Some(taskMetrics)
// Overwrite task metrics with deepcopy
// TODO: only serialize it in local mode
t.taskMetrics = Some(Utils.deserialize[TaskMetrics](Utils.serialize(taskMetrics)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val taskType = Utils.getFormattedClassName(new ShuffleMapTask(0))
val execId = "exe-1"

def makeTaskMetrics(base: Int) = {
val taskMetrics = new TaskMetrics()
def updateTaskMetrics(taskMetrics: TaskMetrics, base: Int) = {
val shuffleReadMetrics = new ShuffleReadMetrics()
val shuffleWriteMetrics = new ShuffleWriteMetrics()
taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics))
Expand Down Expand Up @@ -174,10 +173,16 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L)))
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))

val metrics4 = new TaskMetrics
val metrics5 = new TaskMetrics
val metrics6 = new TaskMetrics
val metrics7 = new TaskMetrics
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
(1234L, 0, 0, makeTaskMetrics(0)),
(1235L, 0, 0, makeTaskMetrics(100)),
(1236L, 1, 0, makeTaskMetrics(200)))))
(1234L, 0, 0, updateTaskMetrics(metrics4, 0)))))
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
(1235L, 0, 0, updateTaskMetrics(metrics5, 100)))))
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
(1236L, 1, 0, updateTaskMetrics(metrics6, 200)))))

var stage0Data = listener.stageIdToData.get((0, 0)).get
var stage1Data = listener.stageIdToData.get((1, 0)).get
Expand All @@ -202,10 +207,10 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc

// task that was included in a heartbeat
listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1),
makeTaskMetrics(300)))
updateTaskMetrics(metrics4, 300)))
// task that wasn't included in a heartbeat
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, taskType, Success, makeTaskInfo(1237L, 1),
makeTaskMetrics(400)))
updateTaskMetrics(metrics7, 400)))

stage0Data = listener.stageIdToData.get((0, 0)).get
stage1Data = listener.stageIdToData.get((1, 0)).get
Expand Down

0 comments on commit 5ca26dc

Please sign in to comment.