Skip to content

Commit

Permalink
CR Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Kostas Sakellis committed Feb 6, 2015
1 parent 1bb78b1 commit 1aa273c
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class TaskMetrics extends Serializable {
/**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
private[spark] def updateShuffleReadMetrics() = synchronized {
private[spark] def updateShuffleReadMetrics(): Unit = synchronized {
val merged = new ShuffleReadMetrics()
for (depMetrics <- depsShuffleReadMetrics) {
merged.incFetchWaitTime(depMetrics.fetchWaitTime)
Expand All @@ -206,7 +206,7 @@ class TaskMetrics extends Serializable {
_shuffleReadMetrics = Some(merged)
}

private[spark] def updateInputMetrics() = synchronized {
private[spark] def updateInputMetrics(): Unit = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class BlockObjectWriterSuite extends FunSuite {

writer.write(Long.box(20))
// Record metrics update on every write
assert(writeMetrics.recordsWritten == 1)
assert(writeMetrics.recordsWritten === 1)
// Metrics don't update on every write
assert(writeMetrics.shuffleBytesWritten == 0)
// After 32 writes, metrics should update
Expand All @@ -41,7 +41,7 @@ class BlockObjectWriterSuite extends FunSuite {
writer.write(Long.box(i))
}
assert(writeMetrics.shuffleBytesWritten > 0)
assert(writeMetrics.recordsWritten == 33)
assert(writeMetrics.recordsWritten === 33)
writer.commitAndClose()
assert(file.length() == writeMetrics.shuffleBytesWritten)
}
Expand All @@ -55,7 +55,7 @@ class BlockObjectWriterSuite extends FunSuite {

writer.write(Long.box(20))
// Record metrics update on every write
assert(writeMetrics.recordsWritten == 1)
assert(writeMetrics.recordsWritten === 1)
// Metrics don't update on every write
assert(writeMetrics.shuffleBytesWritten == 0)
// After 32 writes, metrics should update
Expand All @@ -64,7 +64,7 @@ class BlockObjectWriterSuite extends FunSuite {
writer.write(Long.box(i))
}
assert(writeMetrics.shuffleBytesWritten > 0)
assert(writeMetrics.recordsWritten == 33)
assert(writeMetrics.recordsWritten === 33)
writer.revertPartialWritesAndClose()
assert(writeMetrics.shuffleBytesWritten == 0)
assert(writeMetrics.recordsWritten == 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,27 +684,27 @@ class JsonProtocolSuite extends FunSuite {
if (hasHadoopInput) {
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
inputMetrics.addBytesRead(d + e + f)
inputMetrics.addRecordsRead(if(hasRecords) (d + e + f) / 100 else -1)
inputMetrics.addRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
t.setInputMetrics(Some(inputMetrics))
} else {
val sr = new ShuffleReadMetrics
sr.incRemoteBytesRead(b + d)
sr.incLocalBlocksFetched(e)
sr.incFetchWaitTime(a + d)
sr.incRemoteBlocksFetched(f)
sr.recordsRead = if(hasRecords) (b + d) / 100 else -1
sr.recordsRead = if (hasRecords) (b + d) / 100 else -1
t.setShuffleReadMetrics(Some(sr))
}
if (hasOutput) {
val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop)
outputMetrics.setBytesWritten(a + b + c)
outputMetrics.recordsWritten = if(hasRecords) (a + b + c)/100 else -1
outputMetrics.recordsWritten = if (hasRecords) (a + b + c)/100 else -1
t.outputMetrics = Some(outputMetrics)
} else {
val sw = new ShuffleWriteMetrics
sw.incShuffleBytesWritten(a + b + c)
sw.incShuffleWriteTime(b + c + d)
sw.recordsWritten = if(hasRecords) (a + b + c) / 100 else -1
sw.recordsWritten = if (hasRecords) (a + b + c) / 100 else -1
t.shuffleWriteMetrics = Some(sw)
}
// Make at most 6 blocks
Expand Down

0 comments on commit 1aa273c

Please sign in to comment.