diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 80da62c44edc5..f53711fc7cd57 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -44,7 +44,14 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { blockManager.get(key) match { case Some(blockResult) => // Partition is already materialized, so just return its values + val existingMetrics = context.taskMetrics.inputMetrics + val prevBytesRead = existingMetrics + .filter( _.readMethod == blockResult.inputMetrics.readMethod) + .map(_.bytesRead) + .getOrElse(0L) + context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics) + context.taskMetrics.inputMetrics.get.bytesRead += prevBytesRead new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) case None => diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 0001c2329c83a..dbbaa1112f422 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -213,7 +213,11 @@ class HadoopRDD[K, V]( logInfo("Input split: " + split.inputSplit) val jobConf = getJobConf() - val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + val readMethod = DataReadMethod.Hadoop + val inputMetrics = context.taskMetrics.inputMetrics + .filter(_.readMethod == readMethod) + .getOrElse(new InputMetrics(readMethod)) + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) { @@ -239,6 +243,8 @@ class HadoopRDD[K, V]( var recordsSinceMetricsUpdate = 0 + val bytesReadAtStart = inputMetrics.bytesRead + override def getNext() = { try { finished = !reader.next(key, value) @@ -252,7 +258,7 @@ class HadoopRDD[K, V]( && bytesReadCallback.isDefined) { recordsSinceMetricsUpdate = 0 val bytesReadFn = bytesReadCallback.get - inputMetrics.bytesRead = bytesReadFn() + inputMetrics.bytesRead = bytesReadFn() + bytesReadAtStart } else { recordsSinceMetricsUpdate += 1 } @@ -264,12 +270,12 @@ class HadoopRDD[K, V]( reader.close() if (bytesReadCallback.isDefined) { val bytesReadFn = bytesReadCallback.get - inputMetrics.bytesRead = bytesReadFn() + inputMetrics.bytesRead = bytesReadFn() + bytesReadAtStart } else if (split.inputSplit.value.isInstanceOf[FileSplit]) { // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { - inputMetrics.bytesRead = split.inputSplit.value.getLength + inputMetrics.bytesRead = split.inputSplit.value.getLength + bytesReadAtStart context.taskMetrics.inputMetrics = Some(inputMetrics) } catch { case e: java.io.IOException => diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index e55d03d391e03..18d185d742e97 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -109,7 +109,11 @@ class NewHadoopRDD[K, V]( logInfo("Input split: " + split.serializableHadoopSplit) val conf = confBroadcast.value.value - val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + val readMethod = DataReadMethod.Hadoop + val inputMetrics = context.taskMetrics.inputMetrics + .filter(_.readMethod == readMethod) + .getOrElse(new InputMetrics(readMethod)) + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) { @@ -140,6 +144,8 @@ class NewHadoopRDD[K, V]( var finished = false var recordsSinceMetricsUpdate = 0 + val bytesReadAtStart = inputMetrics.bytesRead + override def hasNext: Boolean = { if (!finished && !havePair) { finished = !reader.nextKeyValue @@ -159,7 +165,7 @@ class NewHadoopRDD[K, V]( && bytesReadCallback.isDefined) { recordsSinceMetricsUpdate = 0 val bytesReadFn = bytesReadCallback.get - inputMetrics.bytesRead = bytesReadFn() + inputMetrics.bytesRead = bytesReadFn() + bytesReadAtStart } else { recordsSinceMetricsUpdate += 1 } @@ -174,12 +180,12 @@ class NewHadoopRDD[K, V]( // Update metrics with final amount if (bytesReadCallback.isDefined) { val bytesReadFn = bytesReadCallback.get - inputMetrics.bytesRead = bytesReadFn() + inputMetrics.bytesRead = bytesReadFn() + bytesReadAtStart } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) { // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { - inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength + inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength + bytesReadAtStart context.taskMetrics.inputMetrics = Some(inputMetrics) } catch { case e: java.io.IOException => diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index f8bcde12a371a..fa4943273c403 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -18,65 +18,109 @@ package org.apache.spark.metrics import java.io.{FileWriter, PrintWriter, File} +import org.apache.hadoop.io.{Text, LongWritable} +import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} +import org.apache.spark.util.Utils import org.apache.spark.SharedSparkContext import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener} import org.scalatest.FunSuite -import org.scalatest.Matchers import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} import scala.collection.mutable.ArrayBuffer -class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Matchers { - test("input metrics when reading text file with single split") { - val file = new File(getClass.getSimpleName + ".txt") - val pw = new PrintWriter(new FileWriter(file)) - pw.println("some stuff") - pw.println("some other stuff") - pw.println("yet more stuff") - pw.println("too much stuff") +class InputOutputMetricsSuite extends FunSuite with SharedSparkContext { + + @transient var tmpDir: File = _ + @transient var tmpFile: File = _ + @transient var tmpFilePath: String = _ + + override def beforeAll() { + super.beforeAll() + + tmpDir = Utils.createTempDir() + val testTempDir = new File(tmpDir, "test") + testTempDir.mkdir() + + tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt") + val pw = new PrintWriter(new FileWriter(tmpFile)) + for (x <- 1 to 1000000) { + pw.println("s") + } pw.close() - file.deleteOnExit() - val taskBytesRead = new ArrayBuffer[Long]() - sc.addSparkListener(new SparkListener() { - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead - } - }) - sc.textFile("file://" + file.getAbsolutePath, 2).count() + // Path to tmpFile + tmpFilePath = "file://" + tmpFile.getAbsolutePath + } - // Wait for task end events to come in - sc.listenerBus.waitUntilEmpty(500) - assert(taskBytesRead.length == 2) - assert(taskBytesRead.sum >= file.length()) + override def afterAll() { + super.afterAll() + Utils.deleteRecursively(tmpDir) } - test("input metrics when reading text file with multiple splits") { - val file = new File(getClass.getSimpleName + ".txt") - val pw = new PrintWriter(new FileWriter(file)) - for (i <- 0 until 10000) { - pw.println("some stuff") + test("input metrics for old hadoop with coalesce") { + val bytesRead = runAndReturnBytesRead { + sc.textFile(tmpFilePath, 4).count() } - pw.close() - file.deleteOnExit() + val bytesRead2 = runAndReturnBytesRead { + sc.textFile(tmpFilePath, 4).coalesce(2).count() + } + assert(bytesRead2 == bytesRead) + assert(bytesRead2 >= tmpFile.length()) + } + + test("input metrics with cache and coalesce") { + // prime the cache manager + val rdd = sc.textFile(tmpFilePath, 4).cache() + rdd.collect() + + val bytesRead = runAndReturnBytesRead { + rdd.count() + } + val bytesRead2 = runAndReturnBytesRead { + rdd.coalesce(4).count() + } + + // for count and coelesce, the same bytes should be read. + assert(bytesRead2 >= bytesRead2) + } + test("input metrics for new Hadoop API with coalesce") { + val bytesRead = runAndReturnBytesRead { + sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable], + classOf[Text]).count() + } + val bytesRead2 = runAndReturnBytesRead { + sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable], + classOf[Text]).coalesce(5).count() + } + assert(bytesRead2 == bytesRead) + assert(bytesRead >= tmpFile.length()) + } + + test("input metrics when reading text file") { + val bytesRead = runAndReturnBytesRead { + sc.textFile(tmpFilePath, 2).count() + } + assert(bytesRead >= tmpFile.length()) + } + + private def runAndReturnBytesRead(job : => Unit): Long = { val taskBytesRead = new ArrayBuffer[Long]() sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead } }) - sc.textFile("file://" + file.getAbsolutePath, 2).count() - // Wait for task end events to come in + job + sc.listenerBus.waitUntilEmpty(500) - assert(taskBytesRead.length == 2) - assert(taskBytesRead.sum >= file.length()) + taskBytesRead.sum } test("output metrics when writing text file") {