From 3dc55fdf450b4237f7c592fce56d1467fd206366 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 2 Aug 2014 22:00:46 -0700 Subject: [PATCH] [Minor] Fixes on top of #1679 Minor fixes on top of #1679. Author: Andrew Or Closes #1736 from andrewor14/amend-#1679 and squashes the following commits: 3b46f5e [Andrew Or] Minor fixes --- .../org/apache/spark/storage/BlockManagerSource.scala | 5 ++--- .../scala/org/apache/spark/storage/StorageUtils.scala | 11 ++++------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index e939318a029dd..3f14c40ec61cb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).sum - val remainingMem = storageStatusList.map(_.memRemaining).sum - (maxMem - remainingMem) / 1024 / 1024 + val memUsed = storageStatusList.map(_.memUsed).sum + memUsed / 1024 / 1024 } }) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 0a0a448baa2ef..2bd6b749be261 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -172,16 +172,13 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { def memRemaining: Long = maxMem - memUsed /** Return the memory used by this block manager. */ - def memUsed: Long = - _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum + def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum /** Return the disk space used by this block manager. */ - def diskUsed: Long = - _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum + def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum /** Return the off-heap space used by this block manager. */ - def offHeapUsed: Long = - _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum + def offHeapUsed: Long = _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum /** Return the memory used by the given RDD in this block manager in O(1) time. */ def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L) @@ -246,7 +243,7 @@ private[spark] object StorageUtils { val rddId = rddInfo.id // Assume all blocks belonging to the same RDD have the same storage level val storageLevel = statuses - .map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel.NONE) + .flatMap(_.rddStorageLevel(rddId)).headOption.getOrElse(StorageLevel.NONE) val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum val memSize = statuses.map(_.memUsedByRdd(rddId)).sum val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum