Skip to content

Commit

Permalink
[SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
Browse files Browse the repository at this point in the history
When calculating the input metrics there was an assumption that one task only reads from one block - this is not true for some operations including coalesce. This patch simply increments the task's input metrics if previous ones existed of the same read method.

A limitation to this patch is that if a task reads from two different blocks of different read methods, one will override the other.

Author: Kostas Sakellis <kostas@cloudera.com>

Closes apache#3120 from ksakellis/kostas-spark-4092 and squashes the following commits:

54e6658 [Kostas Sakellis] Drops metrics if conflicting read methods exist
f0e0cc5 [Kostas Sakellis] Add bytesReadCallback to InputMetrics
a2a36d4 [Kostas Sakellis] CR feedback
5a0c770 [Kostas Sakellis] [SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
  • Loading branch information
Kostas Sakellis authored and pwendell committed Jan 16, 2015
1 parent 96c2c71 commit a79a9f9
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 102 deletions.
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.addBytesRead(inputMetrics.bytesRead)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ private[spark] class Executor(
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
metrics.updateInputMetrics()
metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
if (isLocal) {
// JobProgressListener will hold an reference of it during
Expand Down
75 changes: 73 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.spark.executor

import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.executor.DataReadMethod
import org.apache.spark.executor.DataReadMethod.DataReadMethod

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -80,7 +85,17 @@ class TaskMetrics extends Serializable {
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
* are stored here.
*/
var inputMetrics: Option[InputMetrics] = None
private var _inputMetrics: Option[InputMetrics] = None

def inputMetrics = _inputMetrics

/**
* This should only be used when recreating TaskMetrics, not when updating input metrics in
* executors
*/
private[spark] def setInputMetrics(inputMetrics: Option[InputMetrics]) {
_inputMetrics = inputMetrics
}

/**
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
Expand Down Expand Up @@ -133,6 +148,30 @@ class TaskMetrics extends Serializable {
readMetrics
}

/**
* Returns the input metrics object that the task should use. Currently, if
* there exists an input metric with the same readMethod, we return that one
* so the caller can accumulate bytes read. If the readMethod is different
* than previously seen by this task, we return a new InputMetric but don't
* record it.
*
* Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
* we can store all the different inputMetrics (one per readMethod).
*/
private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod):
InputMetrics =synchronized {
_inputMetrics match {
case None =>
val metrics = new InputMetrics(readMethod)
_inputMetrics = Some(metrics)
metrics
case Some(metrics @ InputMetrics(method)) if method == readMethod =>
metrics
case Some(InputMetrics(method)) =>
new InputMetrics(readMethod)
}
}

/**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
Expand All @@ -146,6 +185,10 @@ class TaskMetrics extends Serializable {
}
_shuffleReadMetrics = Some(merged)
}

private[spark] def updateInputMetrics() = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}
}

private[spark] object TaskMetrics {
Expand Down Expand Up @@ -179,10 +222,38 @@ object DataWriteMethod extends Enumeration with Serializable {
*/
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {

private val _bytesRead: AtomicLong = new AtomicLong()

/**
* Total bytes read.
*/
var bytesRead: Long = 0L
def bytesRead: Long = _bytesRead.get()
@volatile @transient var bytesReadCallback: Option[() => Long] = None

/**
* Adds additional bytes read for this read method.
*/
def addBytesRead(bytes: Long) = {
_bytesRead.addAndGet(bytes)
}

/**
* Invoke the bytesReadCallback and mutate bytesRead.
*/
def updateBytesRead() {
bytesReadCallback.foreach { c =>
_bytesRead.set(c())
}
}

/**
* Register a function that can be called to get up-to-date information on how many bytes the task
* has read from an input source.
*/
def setBytesReadCallback(f: Option[() => Long]) {
bytesReadCallback = f
}
}

/**
Expand Down
39 changes: 13 additions & 26 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,19 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()

val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
} else {
None
}
if (bytesReadCallback.isDefined) {
context.taskMetrics.inputMetrics = Some(inputMetrics)
}
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
split.inputSplit.value match {
case split: FileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
case _ => None
}
)
inputMetrics.setBytesReadCallback(bytesReadCallback)

var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
Expand All @@ -237,40 +238,26 @@ class HadoopRDD[K, V](
val key: K = reader.createKey()
val value: V = reader.createValue()

var recordsSinceMetricsUpdate = 0

override def getNext() = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}

// Update bytes read metric every few records
if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
&& bytesReadCallback.isDefined) {
recordsSinceMetricsUpdate = 0
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
} else {
recordsSinceMetricsUpdate += 1
}
(key, value)
}

override def close() {
try {
reader.close()
if (bytesReadCallback.isDefined) {
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead = split.inputSplit.value.getLength
context.taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.addBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
Expand Down
40 changes: 13 additions & 27 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,19 @@ class NewHadoopRDD[K, V](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value

val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
} else {
None
}
if (bytesReadCallback.isDefined) {
context.taskMetrics.inputMetrics = Some(inputMetrics)
}
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
split.serializableHadoopSplit.value match {
case split: FileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
case _ => None
}
)
inputMetrics.setBytesReadCallback(bytesReadCallback)

val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
Expand Down Expand Up @@ -153,34 +154,19 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false

// Update bytes read metric every few records
if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
&& bytesReadCallback.isDefined) {
recordsSinceMetricsUpdate = 0
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
} else {
recordsSinceMetricsUpdate += 1
}

(reader.getCurrentKey, reader.getCurrentValue)
}

private def close() {
try {
reader.close()

// Update metrics with final amount
if (bytesReadCallback.isDefined) {
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength
context.taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] class BlockResult(
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.bytesRead = bytes
inputMetrics.addBytesRead(bytes)
}

/**
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
metrics.inputMetrics =
Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
metrics.setInputMetrics(
Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson))
metrics.outputMetrics =
Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson)
metrics.updatedBlocks =
Expand Down Expand Up @@ -671,7 +671,7 @@ private[spark] object JsonProtocol {
def inputMetricsFromJson(json: JValue): InputMetrics = {
val metrics = new InputMetrics(
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
metrics.bytesRead = (json \ "Bytes Read").extract[Long]
metrics.addBytesRead((json \ "Bytes Read").extract[Long])
metrics
}

Expand Down
Loading

0 comments on commit a79a9f9

Please sign in to comment.