Skip to content

Commit

Permalink
Drops metrics if conflicting read methods exist
Browse files Browse the repository at this point in the history
Tasks now only store/accumulate input metrics from
the same read method. If a task has interleaved reads
from more than one block of different read methods, we
choose to store the first read methods metrics.

https://issues.apache.org/jira/browse/SPARK-5225
addresses keeping track of all input metrics.

This change also centralizes this logic in TaskMetrics
and gates how inputMetrics can be added to TaskMetrics.
  • Loading branch information
Kostas Sakellis committed Jan 14, 2015
1 parent f0e0cc5 commit 54e6658
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 39 deletions.
13 changes: 5 additions & 8 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics.inputMetrics
val prevBytesRead = existingMetrics
.filter(_.readMethod == blockResult.inputMetrics.readMethod)
.map(_.bytesRead)
.getOrElse(0L)

context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
context.taskMetrics.inputMetrics.get.bytesRead += prevBytesRead
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.addBytesRead(inputMetrics.bytesRead)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
Expand Down
56 changes: 52 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.spark.executor

import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.executor.DataReadMethod
import org.apache.spark.executor.DataReadMethod.DataReadMethod

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -80,7 +85,17 @@ class TaskMetrics extends Serializable {
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
* are stored here.
*/
var inputMetrics: Option[InputMetrics] = None
private var _inputMetrics: Option[InputMetrics] = None

def inputMetrics = _inputMetrics

/**
* This should only be used when recreating TaskMetrics, not when updating input metrics in
* executors
*/
private[spark] def setInputMetrics(inputMetrics: Option[InputMetrics]) {
_inputMetrics = inputMetrics
}

/**
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
Expand Down Expand Up @@ -133,6 +148,30 @@ class TaskMetrics extends Serializable {
readMetrics
}

/**
* Returns the input metrics object that the task should use. Currently, if
* there exists an input metric with the same readMethod, we return that one
* so the caller can accumulate bytes read. If the readMethod is different
* than previously seen by this task, we return a new InputMetric but don't
* record it.
*
* Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
* we can store all the different inputMetrics (one per readMethod).
*/
private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod):
InputMetrics =synchronized {
_inputMetrics match {
case None =>
val metrics = new InputMetrics(readMethod)
_inputMetrics = Some(metrics)
metrics
case Some(metrics @ InputMetrics(method)) if method == readMethod =>
metrics
case Some(InputMetrics(method)) =>
new InputMetrics(readMethod)
}
}

/**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
Expand Down Expand Up @@ -183,19 +222,28 @@ object DataWriteMethod extends Enumeration with Serializable {
*/
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {

private val _bytesRead: AtomicLong = new AtomicLong()

/**
* Total bytes read.
*/
var bytesRead: Long = 0L

def bytesRead: Long = _bytesRead.get()
@volatile @transient var bytesReadCallback: Option[() => Long] = None

/**
* Adds additional bytes read for this read method.
*/
def addBytesRead(bytes: Long) = {
_bytesRead.addAndGet(bytes)
}

/**
* Invoke the bytesReadCallback and mutate bytesRead.
*/
def updateBytesRead() {
bytesReadCallback.foreach { c =>
bytesRead = c()
_bytesRead.set(c())
}
}

Expand Down
9 changes: 3 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,8 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()

val readMethod = DataReadMethod.Hadoop
val inputMetrics = context.taskMetrics.inputMetrics
.filter(_.readMethod == readMethod)
.getOrElse(new InputMetrics(readMethod))
context.taskMetrics.inputMetrics = Some(inputMetrics)
val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
Expand Down Expand Up @@ -260,7 +257,7 @@ class HadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead += split.inputSplit.value.getLength
inputMetrics.addBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
Expand Down
9 changes: 3 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,8 @@ class NewHadoopRDD[K, V](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value

val readMethod = DataReadMethod.Hadoop
val inputMetrics = context.taskMetrics.inputMetrics
.filter(_.readMethod == readMethod)
.getOrElse(new InputMetrics(readMethod))
context.taskMetrics.inputMetrics = Some(inputMetrics)
val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
Expand Down Expand Up @@ -169,7 +166,7 @@ class NewHadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead += split.serializableHadoopSplit.value.getLength
inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] class BlockResult(
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.bytesRead = bytes
inputMetrics.addBytesRead(bytes)
}

/**
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
metrics.inputMetrics =
Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
metrics.setInputMetrics(
Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson))
metrics.outputMetrics =
Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson)
metrics.updatedBlocks =
Expand Down Expand Up @@ -638,7 +638,7 @@ private[spark] object JsonProtocol {
def inputMetricsFromJson(json: JValue): InputMetrics = {
val metrics = new InputMetrics(
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
metrics.bytesRead = (json \ "Bytes Read").extract[Long]
metrics.addBytesRead((json \ "Bytes Read").extract[Long])
metrics
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ package org.apache.spark.metrics

import java.io.{File, FileWriter, PrintWriter}

import scala.collection.mutable.ArrayBuffer

import org.scalatest.FunSuite

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}

import org.apache.spark.SharedSparkContext
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.util.Utils
import org.scalatest.FunSuite

import scala.collection.mutable.ArrayBuffer

class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {

Expand Down Expand Up @@ -85,7 +87,42 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
}

// for count and coelesce, the same bytes should be read.
assert(bytesRead2 >= bytesRead)
assert(bytesRead != 0)
assert(bytesRead2 == bytesRead)
}

/**
* This checks the situation where we have interleaved reads from
* different sources. Currently, we only accumulate fron the first
* read method we find in the task. This test uses cartesian to create
* the interleaved reads.
*
* Once https://issues.apache.org/jira/browse/SPARK-5225 is fixed
* this test should break.
*/
test("input metrics with mixed read method") {
// prime the cache manager
val numPartitions = 2
val rdd = sc.parallelize(1 to 100, numPartitions).cache()
rdd.collect()

val rdd2 = sc.textFile(tmpFilePath, numPartitions)

val bytesRead = runAndReturnBytesRead {
rdd.count()
}
val bytesRead2 = runAndReturnBytesRead {
rdd2.count()
}

val cartRead = runAndReturnBytesRead {
rdd.cartesian(rdd2).count()
}

assert(cartRead != 0)
assert(bytesRead != 0)
// We read from the first rdd of the cartesian once per partition.
assert(cartRead == bytesRead * numPartitions)
}

test("input metrics for new Hadoop API with coalesce") {
Expand All @@ -110,15 +147,16 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
}

test("input metrics with interleaved reads") {
val numPartitions = 2
val cartVector = 0 to 9
val cartFile = new File(tmpDir, getClass.getSimpleName + "_cart.txt")
val cartFilePath = "file://" + cartFile.getAbsolutePath

// write files to disk so we can read them later.
sc.parallelize(cartVector).saveAsTextFile(cartFilePath)
val aRdd = sc.textFile(cartFilePath, 1)
val aRdd = sc.textFile(cartFilePath, numPartitions)

val tmpRdd = sc.textFile(tmpFilePath)
val tmpRdd = sc.textFile(tmpFilePath, numPartitions)

val firstSize= runAndReturnBytesRead {
aRdd.count()
Expand All @@ -143,7 +181,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
// As a result we read from the second partition n times where n is the number of keys in
// p1. Thus the math below for the test.
assert(cartesianBytes != 0)
assert(cartesianBytes == firstSize + (cartVector.length * secondSize))
assert(cartesianBytes == firstSize * numPartitions + (cartVector.length * secondSize))
}

private def runAndReturnBytesRead(job : => Unit): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics.diskBytesSpilled = base + 5
taskMetrics.memoryBytesSpilled = base + 6
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.bytesRead = base + 7
taskMetrics.setInputMetrics(Some(inputMetrics))
inputMetrics.addBytesRead(base + 7)
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
taskMetrics.outputMetrics = Some(outputMetrics)
outputMetrics.bytesWritten = base + 8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,8 @@ class JsonProtocolSuite extends FunSuite {

if (hasHadoopInput) {
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
inputMetrics.bytesRead = d + e + f
t.inputMetrics = Some(inputMetrics)
inputMetrics.addBytesRead(d + e + f)
t.setInputMetrics(Some(inputMetrics))
} else {
val sr = new ShuffleReadMetrics
sr.remoteBytesRead = b + d
Expand Down

0 comments on commit 54e6658

Please sign in to comment.