Skip to content

Commit

Permalink
CR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Kostas Sakellis committed Jan 14, 2015
1 parent 5a0c770 commit a2a36d4
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics.inputMetrics
val prevBytesRead = existingMetrics
.filter( _.readMethod == blockResult.inputMetrics.readMethod)
.filter(_.readMethod == blockResult.inputMetrics.readMethod)
.map(_.bytesRead)
.getOrElse(0L)

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ class NewHadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength + bytesReadAtStart
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength +
bytesReadAtStart
context.taskMetrics.inputMetrics = Some(inputMetrics)
} catch {
case e: java.io.IOException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@

package org.apache.spark.metrics

import java.io.{FileWriter, PrintWriter, File}
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
import java.io.{File, FileWriter, PrintWriter}

import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
import org.apache.spark.SharedSparkContext
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.util.Utils
import org.scalatest.FunSuite

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}

import scala.collection.mutable.ArrayBuffer

class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
Expand Down Expand Up @@ -69,6 +67,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
val bytesRead2 = runAndReturnBytesRead {
sc.textFile(tmpFilePath, 4).coalesce(2).count()
}
assert(bytesRead != 0)
assert(bytesRead2 == bytesRead)
assert(bytesRead2 >= tmpFile.length())
}
Expand All @@ -86,7 +85,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
}

// for count and coelesce, the same bytes should be read.
assert(bytesRead2 >= bytesRead2)
assert(bytesRead2 >= bytesRead)
}

test("input metrics for new Hadoop API with coalesce") {
Expand All @@ -98,6 +97,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext {
sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable],
classOf[Text]).coalesce(5).count()
}
assert(bytesRead != 0)
assert(bytesRead2 == bytesRead)
assert(bytesRead >= tmpFile.length())
}
Expand Down

0 comments on commit a2a36d4

Please sign in to comment.