Skip to content
Snippets Groups Projects
Commit 3dc55fdf authored by Andrew Or's avatar Andrew Or Committed by Patrick Wendell
Browse files

[Minor] Fixes on top of #1679

Minor fixes on top of #1679.

Author: Andrew Or <andrewor14@gmail.com>

Closes #1736 from andrewor14/amend-#1679 and squashes the following commits:

3b46f5e [Andrew Or] Minor fixes
parent 9cf429aa
No related branches found
No related tags found
No related merge requests found
......@@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).sum
val remainingMem = storageStatusList.map(_.memRemaining).sum
(maxMem - remainingMem) / 1024 / 1024
val memUsed = storageStatusList.map(_.memUsed).sum
memUsed / 1024 / 1024
}
})
......
......@@ -172,16 +172,13 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
def memRemaining: Long = maxMem - memUsed
/** Return the memory used by this block manager. */
def memUsed: Long =
_nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
/** Return the disk space used by this block manager. */
def diskUsed: Long =
_nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
/** Return the off-heap space used by this block manager. */
def offHeapUsed: Long =
_nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
def offHeapUsed: Long = _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
/** Return the memory used by the given RDD in this block manager in O(1) time. */
def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
......@@ -246,7 +243,7 @@ private[spark] object StorageUtils {
val rddId = rddInfo.id
// Assume all blocks belonging to the same RDD have the same storage level
val storageLevel = statuses
.map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel.NONE)
.flatMap(_.rddStorageLevel(rddId)).headOption.getOrElse(StorageLevel.NONE)
val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment