Skip to content

Commit

Permalink
[SPARK-19937] Collect metrics for remote bytes read to disk during sh…
Browse files Browse the repository at this point in the history
…uffle.
  • Loading branch information
jinxing committed Jun 9, 2017
1 parent 0ca69c4 commit ae859fb
Show file tree
Hide file tree
Showing 24 changed files with 233 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[spark] object InternalAccumulator {
val REMOTE_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "remoteBlocksFetched"
val LOCAL_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "localBlocksFetched"
val REMOTE_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "remoteBytesRead"
val REMOTE_BYTES_READ_TO_DISK = SHUFFLE_READ_METRICS_PREFIX + "remoteBytesReadToDisk"
val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
private[executor] val _remoteBlocksFetched = new LongAccumulator
private[executor] val _localBlocksFetched = new LongAccumulator
private[executor] val _remoteBytesRead = new LongAccumulator
private[executor] val _remoteBytesReadToDisk = new LongAccumulator
private[executor] val _localBytesRead = new LongAccumulator
private[executor] val _fetchWaitTime = new LongAccumulator
private[executor] val _recordsRead = new LongAccumulator
Expand All @@ -50,6 +51,11 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
*/
def remoteBytesRead: Long = _remoteBytesRead.sum

/**
* Total number of remotes bytes read to disk from the shuffle by this task.
*/
def remoteBytesReadToDisk: Long = _remoteBytesReadToDisk.sum

/**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
Expand Down Expand Up @@ -80,13 +86,15 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
private[spark] def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched.add(v)
private[spark] def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched.add(v)
private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v)
private[spark] def incRemoteBytesReadToDisk(v: Long): Unit = _remoteBytesReadToDisk.add(v)
private[spark] def incLocalBytesRead(v: Long): Unit = _localBytesRead.add(v)
private[spark] def incFetchWaitTime(v: Long): Unit = _fetchWaitTime.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)

private[spark] def setRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.setValue(v)
private[spark] def setLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.setValue(v)
private[spark] def setRemoteBytesRead(v: Long): Unit = _remoteBytesRead.setValue(v)
private[spark] def setRemoteBytesReadToDisk(v: Long): Unit = _remoteBytesReadToDisk.setValue(v)
private[spark] def setLocalBytesRead(v: Long): Unit = _localBytesRead.setValue(v)
private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v)
private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
Expand All @@ -99,13 +107,15 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
_remoteBlocksFetched.setValue(0)
_localBlocksFetched.setValue(0)
_remoteBytesRead.setValue(0)
_remoteBytesReadToDisk.setValue(0)
_localBytesRead.setValue(0)
_fetchWaitTime.setValue(0)
_recordsRead.setValue(0)
metrics.foreach { metric =>
_remoteBlocksFetched.add(metric.remoteBlocksFetched)
_localBlocksFetched.add(metric.localBlocksFetched)
_remoteBytesRead.add(metric.remoteBytesRead)
_remoteBytesReadToDisk.add(metric.remoteBytesReadToDisk)
_localBytesRead.add(metric.localBytesRead)
_fetchWaitTime.add(metric.fetchWaitTime)
_recordsRead.add(metric.recordsRead)
Expand All @@ -122,20 +132,23 @@ private[spark] class TempShuffleReadMetrics {
private[this] var _remoteBlocksFetched = 0L
private[this] var _localBlocksFetched = 0L
private[this] var _remoteBytesRead = 0L
private[this] var _remoteBytesReadToDisk = 0L
private[this] var _localBytesRead = 0L
private[this] var _fetchWaitTime = 0L
private[this] var _recordsRead = 0L

def incRemoteBlocksFetched(v: Long): Unit = _remoteBlocksFetched += v
def incLocalBlocksFetched(v: Long): Unit = _localBlocksFetched += v
def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead += v
def incRemoteBytesReadToDisk(v: Long): Unit = _remoteBytesReadToDisk += v
def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
def incRecordsRead(v: Long): Unit = _recordsRead += v

def remoteBlocksFetched: Long = _remoteBlocksFetched
def localBlocksFetched: Long = _localBlocksFetched
def remoteBytesRead: Long = _remoteBytesRead
def remoteBytesReadToDisk: Long = _remoteBytesReadToDisk
def localBytesRead: Long = _localBytesRead
def fetchWaitTime: Long = _fetchWaitTime
def recordsRead: Long = _recordsRead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ class TaskMetrics private[spark] () extends Serializable {
shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched,
shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
shuffleRead.REMOTE_BYTES_READ_TO_DISK -> shuffleReadMetrics._remoteBytesReadToDisk,
shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ private[v1] object AllStagesResource {
readBytes = submetricQuantiles(_.totalBytesRead),
readRecords = submetricQuantiles(_.recordsRead),
remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
remoteBytesReadToDisk = submetricQuantiles(_.remoteBytesReadToDisk),
remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched),
Expand Down Expand Up @@ -281,6 +282,7 @@ private[v1] object AllStagesResource {
localBlocksFetched = internal.localBlocksFetched,
fetchWaitTime = internal.fetchWaitTime,
remoteBytesRead = internal.remoteBytesRead,
remoteBytesReadToDisk = internal.remoteBytesReadToDisk,
localBytesRead = internal.localBytesRead,
recordsRead = internal.recordsRead
)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class ShuffleReadMetrics private[spark](
val localBlocksFetched: Long,
val fetchWaitTime: Long,
val remoteBytesRead: Long,
val remoteBytesReadToDisk: Long,
val localBytesRead: Long,
val recordsRead: Long)

Expand Down Expand Up @@ -249,6 +250,7 @@ class ShuffleReadMetricDistributions private[spark](
val localBlocksFetched: IndexedSeq[Double],
val fetchWaitTime: IndexedSeq[Double],
val remoteBytesRead: IndexedSeq[Double],
val remoteBytesReadToDisk: IndexedSeq[Double],
val totalBlocksFetched: IndexedSeq[Double])

class ShuffleWriteMetricDistributions private[spark](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ final class ShuffleBlockFetcherIterator(
case SuccessFetchResult(_, address, _, buf, _) =>
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
if (buf.isInstanceOf[FileSegmentManagedBuffer]) {
shuffleMetrics.incRemoteBytesReadToDisk(buf.size)
}
shuffleMetrics.incRemoteBlocksFetched(1)
}
buf.release()
Expand Down Expand Up @@ -363,6 +366,9 @@ final class ShuffleBlockFetcherIterator(
case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) =>
if (address != blockManager.blockManagerId) {
shuffleMetrics.incRemoteBytesRead(buf.size)
if (buf.isInstanceOf[FileSegmentManagedBuffer]) {
shuffleMetrics.incRemoteBytesReadToDisk(buf.size)
}
shuffleMetrics.incRemoteBlocksFetched(1)
}
bytesInFlight -= size
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ private[spark] object UIData {
remoteBlocksFetched: Long,
localBlocksFetched: Long,
remoteBytesRead: Long,
remoteBytesReadToDisk: Long,
localBytesRead: Long,
fetchWaitTime: Long,
recordsRead: Long,
Expand All @@ -274,6 +275,7 @@ private[spark] object UIData {
remoteBlocksFetched = metrics.remoteBlocksFetched,
localBlocksFetched = metrics.localBlocksFetched,
remoteBytesRead = metrics.remoteBytesRead,
remoteBytesReadToDisk = metrics.remoteBytesReadToDisk,
localBytesRead = metrics.localBytesRead,
fetchWaitTime = metrics.fetchWaitTime,
recordsRead = metrics.recordsRead,
Expand All @@ -282,7 +284,7 @@ private[spark] object UIData {
)
}
}
private val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0)
private val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0, 0)
}

case class ShuffleWriteMetricsUIData(
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ private[spark] object JsonProtocol {
("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~
("Remote Bytes Read" -> taskMetrics.shuffleReadMetrics.remoteBytesRead) ~
("Remote Bytes Read To Disk" -> taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk) ~
("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~
("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead)
val shuffleWriteMetrics: JValue =
Expand Down Expand Up @@ -804,6 +805,8 @@ private[spark] object JsonProtocol {
readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int])
readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int])
readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long])
Utils.jsonOption(readJson \ "Remote Bytes Read To Disk")
.foreach { v => readMetrics.incRemoteBytesReadToDisk(v.extract[Long])}
readMetrics.incLocalBytesRead(
Utils.jsonOption(readJson \ "Local Bytes Read").map(_.extract[Long]).getOrElse(0L))
readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -105,6 +106,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -150,6 +152,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -195,6 +198,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -240,6 +244,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -285,6 +290,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -330,6 +336,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -375,6 +382,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -105,6 +106,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -150,6 +152,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -195,6 +198,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -240,6 +244,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -285,6 +290,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -330,6 +336,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down Expand Up @@ -375,6 +382,7 @@
"localBlocksFetched" : 0,
"fetchWaitTime" : 0,
"remoteBytesRead" : 0,
"remoteBytesReadToDisk" : 0,
"localBytesRead" : 0,
"recordsRead" : 0
},
Expand Down
Loading

0 comments on commit ae859fb

Please sign in to comment.