Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4874] [CORE] Collect record count metrics #4067

Closed
wants to merge 11 commits into from
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.addBytesRead(inputMetrics.bytesRead)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
existingMetrics.incBytesRead(inputMetrics.bytesRead)

val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
case None =>
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
Expand Down
54 changes: 40 additions & 14 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,19 @@ class TaskMetrics extends Serializable {
/**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
private[spark] def updateShuffleReadMetrics() = synchronized {
private[spark] def updateShuffleReadMetrics(): Unit = synchronized {
val merged = new ShuffleReadMetrics()
for (depMetrics <- depsShuffleReadMetrics) {
merged.incFetchWaitTime(depMetrics.fetchWaitTime)
merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
merged.incRecordsRead(depMetrics.recordsRead)
}
_shuffleReadMetrics = Some(merged)
}

private[spark] def updateInputMetrics() = synchronized {
private[spark] def updateInputMetrics(): Unit = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}
}
Expand Down Expand Up @@ -242,27 +243,31 @@ object DataWriteMethod extends Enumeration with Serializable {
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {

private val _bytesRead: AtomicLong = new AtomicLong()
/**
* This is volatile so that it is visible to the updater thread.
*/
@volatile @transient var bytesReadCallback: Option[() => Long] = None

/**
* Total bytes read.
*/
def bytesRead: Long = _bytesRead.get()
@volatile @transient var bytesReadCallback: Option[() => Long] = None
private var _bytesRead: Long = _
def bytesRead: Long = _bytesRead
def incBytesRead(bytes: Long) = _bytesRead += bytes

/**
* Adds additional bytes read for this read method.
* Total records read.
*/
def addBytesRead(bytes: Long) = {
_bytesRead.addAndGet(bytes)
}
private var _recordsRead: Long = _
def recordsRead: Long = _recordsRead
def incRecordsRead(records: Long) = _recordsRead += records

/**
* Invoke the bytesReadCallback and mutate bytesRead.
*/
def updateBytesRead() {
bytesReadCallback.foreach { c =>
_bytesRead.set(c())
_bytesRead = c()
}
}

Expand All @@ -287,6 +292,13 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
private var _bytesWritten: Long = _
def bytesWritten = _bytesWritten
private[spark] def setBytesWritten(value : Long) = _bytesWritten = value

/**
* Total records written
*/
private var _recordsWritten: Long = 0L
def recordsWritten = _recordsWritten
private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value
}

/**
Expand All @@ -301,16 +313,15 @@ class ShuffleReadMetrics extends Serializable {
private var _remoteBlocksFetched: Int = _
def remoteBlocksFetched = _remoteBlocksFetched
private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
private[spark] def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value

/**
* Number of local blocks fetched in this shuffle by this task
*/
private var _localBlocksFetched: Int = _
def localBlocksFetched = _localBlocksFetched
private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
private[spark] def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value

private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value

/**
* Time the task spent waiting for remote shuffle blocks. This only includes the time
Expand All @@ -334,6 +345,14 @@ class ShuffleReadMetrics extends Serializable {
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched

/**
* Total number of records read from the shuffle by this task
*/
private var _recordsRead: Long = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be more consistent with the presentation in the UI and the other fields here, should this be shuffleRecordsRead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ksakellis mind fixing this one?

def recordsRead = _recordsRead
private[spark] def incRecordsRead(value: Long) = _recordsRead += value
private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
}

/**
Expand All @@ -358,5 +377,12 @@ class ShuffleWriteMetrics extends Serializable {
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value


/**
* Total number of records written to the shuffle by this task
*/
@volatile private var _shuffleRecordsWritten: Long = _
def shuffleRecordsWritten = _shuffleRecordsWritten
private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
}
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ class HadoopRDD[K, V](
case eof: EOFException =>
finished = true
}

if (!finished) {
inputMetrics.incRecordsRead(1)
}
(key, value)
}

Expand All @@ -261,7 +263,7 @@ class HadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.addBytesRead(split.inputSplit.value.getLength)
inputMetrics.incBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false

if (!finished) {
inputMetrics.incRecordsRead(1)
}
(reader.getCurrentKey, reader.getCurrentValue)
}

Expand All @@ -165,7 +167,7 @@ class NewHadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength)
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
RecordWriter => NewRecordWriter}
RecordWriter => NewRecordWriter}

import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
Expand Down Expand Up @@ -993,8 +993,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)

val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
var recordsWritten = 0L
try {
var recordsWritten = 0L
while (iter.hasNext) {
val pair = iter.next()
writer.write(pair._1, pair._2)
Expand All @@ -1008,6 +1008,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
committer.commitTask(hadoopContext)
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
outputMetrics.setRecordsWritten(recordsWritten)
1
} : Int

Expand Down Expand Up @@ -1065,8 +1066,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
var recordsWritten = 0L
try {
var recordsWritten = 0L
while (iter.hasNext) {
val record = iter.next()
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
Expand All @@ -1080,6 +1081,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
writer.commit()
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
outputMetrics.setRecordsWritten(recordsWritten)
}

self.context.runJob(self, writeToFile)
Expand All @@ -1097,9 +1099,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long],
outputMetrics: OutputMetrics, recordsWritten: Long): Unit = {
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0
&& bytesWrittenCallback.isDefined) {
if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
outputMetrics.setRecordsWritten(recordsWritten)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.{CompletionIterator}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need braces here if it is a single import.


private[hash] object BlockStoreShuffleFetcher extends Logging {
def fetch[T](
Expand Down Expand Up @@ -86,6 +86,12 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
context.taskMetrics.updateShuffleReadMetrics()
})

new InterruptibleIterator[T](context, completionIter)
new InterruptibleIterator[T](context, completionIter) {
val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
override def next(): T = {
readMetrics.incRecordsRead(1)
delegate.next()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] class BlockResult(
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.addBytesRead(bytes)
inputMetrics.incBytesRead(bytes)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.spark.executor.ShuffleWriteMetrics
* appending data to an existing block, and can guarantee atomicity in the case of faults
* as it allows the caller to revert partial writes.
*
* This interface does not support concurrent writes.
* This interface does not support concurrent writes. Also, once the writer has
* been opened, it cannot be reopened again.
*/
private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {

Expand Down Expand Up @@ -95,6 +96,7 @@ private[spark] class DiskBlockObjectWriter(
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private var initialized = false
private var hasBeenClosed = false

/**
* Cursors used to represent positions in the file.
Expand All @@ -115,11 +117,16 @@ private[spark] class DiskBlockObjectWriter(
private var finalPosition: Long = -1
private var reportedPosition = initialPosition

/** Calling channel.position() to update the write metrics can be a little bit expensive, so we
* only call it every N writes */
private var writesSinceMetricsUpdate = 0
/**
* Keep track of number of records written and also use this to periodically
* output bytes written since the latter is expensive to do for each record.
*/
private var numRecordsWritten = 0

override def open(): BlockObjectWriter = {
if (hasBeenClosed) {
throw new IllegalStateException("Writer already closed. Cannot be reopened.")
}
fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
Expand All @@ -145,6 +152,7 @@ private[spark] class DiskBlockObjectWriter(
ts = null
objOut = null
initialized = false
hasBeenClosed = true
}
}

Expand All @@ -168,6 +176,7 @@ private[spark] class DiskBlockObjectWriter(
override def revertPartialWritesAndClose() {
try {
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
writeMetrics.decShuffleRecordsWritten(numRecordsWritten)

if (initialized) {
objOut.flush()
Expand All @@ -193,12 +202,11 @@ private[spark] class DiskBlockObjectWriter(
}

objOut.writeObject(value)
numRecordsWritten += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current use of numRecordsWritten assumes that a BlockObjectWriter will never be closed and re-opened, correct? I'm not sure whether that is enforced anywhere. Should we zero this var out in open()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the BlockObjectWriter can be reopened, wouldn't we we need to reset some other variables too? Like finalPosition, initialPosition and reportedPosition? They are set during construction? I don't see anywhere any enforcement of reopening the BlockObjectWriter. We could either reset all the variables i mentioned on open() or we can add a check that once the blockObjectWriter has been opened, it can't be reopened. I'm leaning towards the latter. what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about just adding a class level comment that it can't be used after it is closed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i guess that is the least we can do. Having an explicit check i think would be better. So if we are okay with it, i can add a boolean that tracks if the blockwriter has been opened before and if so, don't allow it to be reopened. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, you can enforce that it is never re-opened if you want.

writeMetrics.incShuffleRecordsWritten(1)

if (writesSinceMetricsUpdate == 32) {
writesSinceMetricsUpdate = 0
if (numRecordsWritten % 32 == 0) {
updateBytesWritten()
} else {
writesSinceMetricsUpdate += 1
}
}

Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/ToolTips.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ private[spark] object ToolTips {
val SHUFFLE_READ_BLOCKED_TIME =
"Time that the task spent blocked waiting for shuffle data to be read from remote machines."

val INPUT = "Bytes read from Hadoop or from Spark storage."
val INPUT = "Bytes and records read from Hadoop or from Spark storage."

val OUTPUT = "Bytes written to Hadoop."
val OUTPUT = "Bytes and records written to Hadoop."

val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."
val SHUFFLE_WRITE =
"Bytes and records written to disk in order to be read by a shuffle in a future stage."

val SHUFFLE_READ =
"""Bytes read from remote executors. Typically less than shuffle write bytes
"""Bytes and records read from remote executors. Typically less than shuffle write bytes
because this does not include shuffle data read locally."""

val GETTING_RESULT_TIME =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
val executorToInputBytes = HashMap[String, Long]()
val executorToInputRecords = HashMap[String, Long]()
val executorToOutputBytes = HashMap[String, Long]()
val executorToOutputRecords = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()

Expand Down Expand Up @@ -78,10 +80,14 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
metrics.inputMetrics.foreach { inputMetrics =>
executorToInputBytes(eid) =
executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
executorToInputRecords(eid) =
executorToInputRecords.getOrElse(eid, 0L) + inputMetrics.recordsRead
}
metrics.outputMetrics.foreach { outputMetrics =>
executorToOutputBytes(eid) =
executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten
executorToOutputRecords(eid) =
executorToOutputRecords.getOrElse(eid, 0L) + outputMetrics.recordsWritten
}
metrics.shuffleReadMetrics.foreach { shuffleRead =>
executorToShuffleRead(eid) =
Expand Down
Loading