Skip to content

Commit

Permalink
[SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
Browse files Browse the repository at this point in the history
When calculating the input metrics there was an assumption
that one task only reads from one block - this is not true
for some operations including coalesce. This patch simply
increments the task's input metrics if previous ones existed
of the same read method.

A limitation to this patch is that if a task reads from
two different blocks of different read methods, one will override
the other.
  • Loading branch information
Kostas Sakellis committed Jan 14, 2015
1 parent f996909 commit 5a0c770
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 40 deletions.
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics.inputMetrics
val prevBytesRead = existingMetrics
.filter( _.readMethod == blockResult.inputMetrics.readMethod)
.map(_.bytesRead)
.getOrElse(0L)

context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
context.taskMetrics.inputMetrics.get.bytesRead += prevBytesRead
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,11 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()

val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
val readMethod = DataReadMethod.Hadoop
val inputMetrics = context.taskMetrics.inputMetrics
.filter(_.readMethod == readMethod)
.getOrElse(new InputMetrics(readMethod))

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
Expand All @@ -239,6 +243,8 @@ class HadoopRDD[K, V](

var recordsSinceMetricsUpdate = 0

val bytesReadAtStart = inputMetrics.bytesRead

override def getNext() = {
try {
finished = !reader.next(key, value)
Expand All @@ -252,7 +258,7 @@ class HadoopRDD[K, V](
&& bytesReadCallback.isDefined) {
recordsSinceMetricsUpdate = 0
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.bytesRead = bytesReadFn() + bytesReadAtStart
} else {
recordsSinceMetricsUpdate += 1
}
Expand All @@ -264,12 +270,12 @@ class HadoopRDD[K, V](
reader.close()
if (bytesReadCallback.isDefined) {
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.bytesRead = bytesReadFn() + bytesReadAtStart
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead = split.inputSplit.value.getLength
inputMetrics.bytesRead = split.inputSplit.value.getLength + bytesReadAtStart
context.taskMetrics.inputMetrics = Some(inputMetrics)
} catch {
case e: java.io.IOException =>
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ class NewHadoopRDD[K, V](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value

val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
val readMethod = DataReadMethod.Hadoop
val inputMetrics = context.taskMetrics.inputMetrics
.filter(_.readMethod == readMethod)
.getOrElse(new InputMetrics(readMethod))

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
Expand Down Expand Up @@ -140,6 +144,8 @@ class NewHadoopRDD[K, V](
var finished = false
var recordsSinceMetricsUpdate = 0

val bytesReadAtStart = inputMetrics.bytesRead

override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !reader.nextKeyValue
Expand All @@ -159,7 +165,7 @@ class NewHadoopRDD[K, V](
&& bytesReadCallback.isDefined) {
recordsSinceMetricsUpdate = 0
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.bytesRead = bytesReadFn() + bytesReadAtStart
} else {
recordsSinceMetricsUpdate += 1
}
Expand All @@ -174,12 +180,12 @@ class NewHadoopRDD[K, V](
// Update metrics with final amount
if (bytesReadCallback.isDefined) {
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.bytesRead = bytesReadFn() + bytesReadAtStart
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength + bytesReadAtStart
context.taskMetrics.inputMetrics = Some(inputMetrics)
} catch {
case e: java.io.IOException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,65 +18,109 @@
package org.apache.spark.metrics

import java.io.{FileWriter, PrintWriter, File}
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}

import org.apache.spark.util.Utils
import org.apache.spark.SharedSparkContext
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}

import org.scalatest.FunSuite
import org.scalatest.Matchers

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}

import scala.collection.mutable.ArrayBuffer

class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Matchers {
test("input metrics when reading text file with single split") {
val file = new File(getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(file))
pw.println("some stuff")
pw.println("some other stuff")
pw.println("yet more stuff")
pw.println("too much stuff")
class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {

@transient var tmpDir: File = _
@transient var tmpFile: File = _
@transient var tmpFilePath: String = _

override def beforeAll() {
super.beforeAll()

tmpDir = Utils.createTempDir()
val testTempDir = new File(tmpDir, "test")
testTempDir.mkdir()

tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(tmpFile))
for (x <- 1 to 1000000) {
pw.println("s")
}
pw.close()
file.deleteOnExit()

val taskBytesRead = new ArrayBuffer[Long]()
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
}
})
sc.textFile("file://" + file.getAbsolutePath, 2).count()
// Path to tmpFile
tmpFilePath = "file://" + tmpFile.getAbsolutePath
}

// Wait for task end events to come in
sc.listenerBus.waitUntilEmpty(500)
assert(taskBytesRead.length == 2)
assert(taskBytesRead.sum >= file.length())
override def afterAll() {
super.afterAll()
Utils.deleteRecursively(tmpDir)
}

test("input metrics when reading text file with multiple splits") {
val file = new File(getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(file))
for (i <- 0 until 10000) {
pw.println("some stuff")
test("input metrics for old hadoop with coalesce") {
val bytesRead = runAndReturnBytesRead {
sc.textFile(tmpFilePath, 4).count()
}
pw.close()
file.deleteOnExit()
val bytesRead2 = runAndReturnBytesRead {
sc.textFile(tmpFilePath, 4).coalesce(2).count()
}
assert(bytesRead2 == bytesRead)
assert(bytesRead2 >= tmpFile.length())
}

test("input metrics with cache and coalesce") {
// prime the cache manager
val rdd = sc.textFile(tmpFilePath, 4).cache()
rdd.collect()

val bytesRead = runAndReturnBytesRead {
rdd.count()
}
val bytesRead2 = runAndReturnBytesRead {
rdd.coalesce(4).count()
}

// for count and coelesce, the same bytes should be read.
assert(bytesRead2 >= bytesRead2)
}

test("input metrics for new Hadoop API with coalesce") {
val bytesRead = runAndReturnBytesRead {
sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
classOf[Text]).count()
}
val bytesRead2 = runAndReturnBytesRead {
sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
classOf[Text]).coalesce(5).count()
}
assert(bytesRead2 == bytesRead)
assert(bytesRead >= tmpFile.length())
}

test("input metrics when reading text file") {
val bytesRead = runAndReturnBytesRead {
sc.textFile(tmpFilePath, 2).count()
}
assert(bytesRead >= tmpFile.length())
}

private def runAndReturnBytesRead(job : => Unit): Long = {
val taskBytesRead = new ArrayBuffer[Long]()
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead
}
})
sc.textFile("file://" + file.getAbsolutePath, 2).count()

// Wait for task end events to come in
job

sc.listenerBus.waitUntilEmpty(500)
assert(taskBytesRead.length == 2)
assert(taskBytesRead.sum >= file.length())
taskBytesRead.sum
}

test("output metrics when writing text file") {
Expand Down

0 comments on commit 5a0c770

Please sign in to comment.