diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 368835a86749389d2370a2c1aefc5285897dca77..9ba21cfcde01a9ec937dfb72e12bbdb9fd365afd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -48,7 +48,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend} import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalBackend -import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} +import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils} @@ -843,7 +843,9 @@ class SparkContext(config: SparkConf) extends Logging { */ @DeveloperApi def getRDDStorageInfo: Array[RDDInfo] = { - StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this) + val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray + StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus) + rddInfos.filter(_.isCached) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 94f5a4bb2e9cdb41ff808e6834a596f44116196c..bd31e3c5a187f87fce6687a23ff3aa8a2f798227 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -267,9 +267,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } private def storageStatus: Array[StorageStatus] = { - blockManagerInfo.map { case(blockManagerId, info) => - val blockMap = mutable.Map[BlockId, BlockStatus](info.blocks.toSeq: _*) - new StorageStatus(blockManagerId, info.maxMem, blockMap) + blockManagerInfo.map { case (blockManagerId, info) => + new StorageStatus(blockManagerId, info.maxMem, info.blocks) }.toArray } @@ -424,7 +423,14 @@ case class BlockStatus( storageLevel: StorageLevel, memSize: Long, diskSize: Long, - tachyonSize: Long) + tachyonSize: Long) { + def isCached: Boolean = memSize + diskSize + tachyonSize > 0 +} + +@DeveloperApi +object BlockStatus { + def empty: BlockStatus = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L) +} private[spark] class BlockManagerInfo( val blockManagerId: BlockManagerId, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 687586490abfe413f2aeb37f0f258a58cdefa924..e939318a029dd50f7bfdff9d257afae80dd7506b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -30,7 +30,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) + val maxMem = storageStatusList.map(_.maxMem).sum maxMem / 1024 / 1024 } }) @@ -38,7 +38,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) + val remainingMem = storageStatusList.map(_.memRemaining).sum remainingMem / 1024 / 1024 } }) @@ -46,8 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) + val maxMem = storageStatusList.map(_.maxMem).sum + val remainingMem = storageStatusList.map(_.memRemaining).sum (maxMem - remainingMem) / 1024 / 1024 } }) @@ -55,11 +55,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus - val diskSpaceUsed = storageStatusList - .flatMap(_.blocks.values.map(_.diskSize)) - .reduceOption(_ + _) - .getOrElse(0L) - + val diskSpaceUsed = storageStatusList.map(_.diskUsed).sum diskSpaceUsed / 1024 / 1024 } }) diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 5a72e216872a6d4cb47544bc664ef1f43d74a889..120c327a7e580cc81fb3ba8315783200d02e94f9 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -34,6 +34,8 @@ class RDDInfo( var diskSize = 0L var tachyonSize = 0L + def isCached: Boolean = (memSize + diskSize + tachyonSize > 0) && numCachedPartitions > 0 + override def toString = { import Utils.bytesToString ("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " + diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 41c960c867e2ef75780f51fea701dcee41f7a1b6..d9066f766476e60e31e01b15b6c3b4ca27f47c3e 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -35,13 +35,12 @@ class StorageStatusListener extends SparkListener { /** Update storage status list to reflect updated block statuses */ private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { - val filteredStatus = executorIdToStorageStatus.get(execId) - filteredStatus.foreach { storageStatus => + executorIdToStorageStatus.get(execId).foreach { storageStatus => updatedBlocks.foreach { case (blockId, updatedStatus) => if (updatedStatus.storageLevel == StorageLevel.NONE) { - storageStatus.blocks.remove(blockId) + storageStatus.removeBlock(blockId) } else { - storageStatus.blocks(blockId) = updatedStatus + storageStatus.updateBlock(blockId, updatedStatus) } } } @@ -50,9 +49,8 @@ class StorageStatusListener extends SparkListener { /** Update storage status list to reflect the removal of an RDD from the cache */ private def updateStorageStatus(unpersistedRDDId: Int) { storageStatusList.foreach { storageStatus => - val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId) - unpersistedBlocksIds.foreach { blockId => - storageStatus.blocks.remove(blockId) + storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) => + storageStatus.removeBlock(blockId) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 177281f66336751b29a57eddb611f20816f5a930..0a0a448baa2ef296137c65ab2511c03de81858fe 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -20,122 +20,258 @@ package org.apache.spark.storage import scala.collection.Map import scala.collection.mutable -import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi /** * :: DeveloperApi :: * Storage information for each BlockManager. + * + * This class assumes BlockId and BlockStatus are immutable, such that the consumers of this + * class cannot mutate the source of the information. Accesses are not thread-safe. */ @DeveloperApi -class StorageStatus( - val blockManagerId: BlockManagerId, - val maxMem: Long, - val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) { +class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { - def memUsed = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L) + /** + * Internal representation of the blocks stored in this block manager. + * + * We store RDD blocks and non-RDD blocks separately to allow quick retrievals of RDD blocks. + * These collections should only be mutated through the add/update/removeBlock methods. + */ + private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]] + private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus] - def memUsedByRDD(rddId: Int) = - rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_ + _).getOrElse(0L) + /** + * Storage information of the blocks that entails memory, disk, and off-heap memory usage. + * + * As with the block maps, we store the storage information separately for RDD blocks and + * non-RDD blocks for the same reason. In particular, RDD storage information is stored + * in a map indexed by the RDD ID to the following 4-tuple: + * + * (memory size, disk size, off-heap size, storage level) + * + * We assume that all the blocks that belong to the same RDD have the same storage level. + * This field is not relevant to non-RDD blocks, however, so the storage information for + * non-RDD blocks contains only the first 3 fields (in the same order). + */ + private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, Long, StorageLevel)] + private var _nonRddStorageInfo: (Long, Long, Long) = (0L, 0L, 0L) - def diskUsed = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L) + /** Create a storage status with an initial set of blocks, leaving the source unmodified. */ + def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) { + this(bmid, maxMem) + initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) } + } - def diskUsedByRDD(rddId: Int) = - rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L) + /** + * Return the blocks stored in this block manager. + * + * Note that this is somewhat expensive, as it involves cloning the underlying maps and then + * concatenating them together. Much faster alternatives exist for common operations such as + * contains, get, and size. + */ + def blocks: Map[BlockId, BlockStatus] = _nonRddBlocks ++ rddBlocks - def memRemaining: Long = maxMem - memUsed + /** + * Return the RDD blocks stored in this block manager. + * + * Note that this is somewhat expensive, as it involves cloning the underlying maps and then + * concatenating them together. Much faster alternatives exist for common operations such as + * getting the memory, disk, and off-heap memory sizes occupied by this RDD. + */ + def rddBlocks: Map[BlockId, BlockStatus] = _rddBlocks.flatMap { case (_, blocks) => blocks } - def rddBlocks = blocks.collect { case (rdd: RDDBlockId, status) => (rdd, status) } -} + /** Return the blocks that belong to the given RDD stored in this block manager. */ + def rddBlocksById(rddId: Int): Map[BlockId, BlockStatus] = { + _rddBlocks.get(rddId).getOrElse(Map.empty) + } -/** Helper methods for storage-related objects. */ -private[spark] object StorageUtils { + /** Add the given block to this storage status. If it already exists, overwrite it. */ + private[spark] def addBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = { + updateStorageInfo(blockId, blockStatus) + blockId match { + case RDDBlockId(rddId, _) => + _rddBlocks.getOrElseUpdate(rddId, new mutable.HashMap)(blockId) = blockStatus + case _ => + _nonRddBlocks(blockId) = blockStatus + } + } + + /** Update the given block in this storage status. If it doesn't already exist, add it. */ + private[spark] def updateBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = { + addBlock(blockId, blockStatus) + } + + /** Remove the given block from this storage status. */ + private[spark] def removeBlock(blockId: BlockId): Option[BlockStatus] = { + updateStorageInfo(blockId, BlockStatus.empty) + blockId match { + case RDDBlockId(rddId, _) => + // Actually remove the block, if it exists + if (_rddBlocks.contains(rddId)) { + val removed = _rddBlocks(rddId).remove(blockId) + // If the given RDD has no more blocks left, remove the RDD + if (_rddBlocks(rddId).isEmpty) { + _rddBlocks.remove(rddId) + } + removed + } else { + None + } + case _ => + _nonRddBlocks.remove(blockId) + } + } /** - * Returns basic information of all RDDs persisted in the given SparkContext. This does not - * include storage information. + * Return whether the given block is stored in this block manager in O(1) time. + * Note that this is much faster than `this.blocks.contains`, which is O(blocks) time. */ - def rddInfoFromSparkContext(sc: SparkContext): Array[RDDInfo] = { - sc.persistentRdds.values.map { rdd => - val rddName = Option(rdd.name).getOrElse(rdd.id.toString) - val rddNumPartitions = rdd.partitions.size - val rddStorageLevel = rdd.getStorageLevel - val rddInfo = new RDDInfo(rdd.id, rddName, rddNumPartitions, rddStorageLevel) - rddInfo - }.toArray + def containsBlock(blockId: BlockId): Boolean = { + blockId match { + case RDDBlockId(rddId, _) => + _rddBlocks.get(rddId).exists(_.contains(blockId)) + case _ => + _nonRddBlocks.contains(blockId) + } } - /** Returns storage information of all RDDs persisted in the given SparkContext. */ - def rddInfoFromStorageStatus( - storageStatuses: Seq[StorageStatus], - sc: SparkContext): Array[RDDInfo] = { - rddInfoFromStorageStatus(storageStatuses, rddInfoFromSparkContext(sc)) + /** + * Return the given block stored in this block manager in O(1) time. + * Note that this is much faster than `this.blocks.get`, which is O(blocks) time. + */ + def getBlock(blockId: BlockId): Option[BlockStatus] = { + blockId match { + case RDDBlockId(rddId, _) => + _rddBlocks.get(rddId).map(_.get(blockId)).flatten + case _ => + _nonRddBlocks.get(blockId) + } } - /** Returns storage information of all RDDs in the given list. */ - def rddInfoFromStorageStatus( - storageStatuses: Seq[StorageStatus], - rddInfos: Seq[RDDInfo], - updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = { - - // Mapping from a block ID -> its status - val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*) - - // Record updated blocks, if any - updatedBlocks - .collect { case (id: RDDBlockId, status) => (id, status) } - .foreach { case (id, status) => blockMap(id) = status } - - // Mapping from RDD ID -> an array of associated BlockStatuses - val rddBlockMap = blockMap - .groupBy { case (k, _) => k.rddId } - .mapValues(_.values.toArray) - - // Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information) - val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap - - val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) => - // Add up memory, disk and Tachyon sizes - val persistedBlocks = - blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 } - val _storageLevel = - if (persistedBlocks.length > 0) persistedBlocks(0).storageLevel else StorageLevel.NONE - val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L) - val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L) - val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L) - rddInfoMap.get(rddId).map { rddInfo => - rddInfo.storageLevel = _storageLevel - rddInfo.numCachedPartitions = persistedBlocks.length - rddInfo.memSize = memSize - rddInfo.diskSize = diskSize - rddInfo.tachyonSize = tachyonSize - rddInfo - } - }.toArray + /** + * Return the number of blocks stored in this block manager in O(RDDs) time. + * Note that this is much faster than `this.blocks.size`, which is O(blocks) time. + */ + def numBlocks: Int = _nonRddBlocks.size + numRddBlocks + + /** + * Return the number of RDD blocks stored in this block manager in O(RDDs) time. + * Note that this is much faster than `this.rddBlocks.size`, which is O(RDD blocks) time. + */ + def numRddBlocks: Int = _rddBlocks.values.map(_.size).sum - scala.util.Sorting.quickSort(rddStorageInfos) - rddStorageInfos + /** + * Return the number of blocks that belong to the given RDD in O(1) time. + * Note that this is much faster than `this.rddBlocksById(rddId).size`, which is + * O(blocks in this RDD) time. + */ + def numRddBlocksById(rddId: Int): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0) + + /** Return the memory remaining in this block manager. */ + def memRemaining: Long = maxMem - memUsed + + /** Return the memory used by this block manager. */ + def memUsed: Long = + _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum + + /** Return the disk space used by this block manager. */ + def diskUsed: Long = + _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum + + /** Return the off-heap space used by this block manager. */ + def offHeapUsed: Long = + _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum + + /** Return the memory used by the given RDD in this block manager in O(1) time. */ + def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L) + + /** Return the disk space used by the given RDD in this block manager in O(1) time. */ + def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L) + + /** Return the off-heap space used by the given RDD in this block manager in O(1) time. */ + def offHeapUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._3).getOrElse(0L) + + /** Return the storage level, if any, used by the given RDD in this block manager. */ + def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._4) + + /** + * Update the relevant storage info, taking into account any existing status for this block. + */ + private def updateStorageInfo(blockId: BlockId, newBlockStatus: BlockStatus): Unit = { + val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty) + val changeInMem = newBlockStatus.memSize - oldBlockStatus.memSize + val changeInDisk = newBlockStatus.diskSize - oldBlockStatus.diskSize + val changeInTachyon = newBlockStatus.tachyonSize - oldBlockStatus.tachyonSize + val level = newBlockStatus.storageLevel + + // Compute new info from old info + val (oldMem, oldDisk, oldTachyon) = blockId match { + case RDDBlockId(rddId, _) => + _rddStorageInfo.get(rddId) + .map { case (mem, disk, tachyon, _) => (mem, disk, tachyon) } + .getOrElse((0L, 0L, 0L)) + case _ => + _nonRddStorageInfo + } + val newMem = math.max(oldMem + changeInMem, 0L) + val newDisk = math.max(oldDisk + changeInDisk, 0L) + val newTachyon = math.max(oldTachyon + changeInTachyon, 0L) + + // Set the correct info + blockId match { + case RDDBlockId(rddId, _) => + // If this RDD is no longer persisted, remove it + if (newMem + newDisk + newTachyon == 0) { + _rddStorageInfo.remove(rddId) + } else { + _rddStorageInfo(rddId) = (newMem, newDisk, newTachyon, level) + } + case _ => + _nonRddStorageInfo = (newMem, newDisk, newTachyon) + } } - /** Returns a mapping from BlockId to the locations of the associated block. */ - def blockLocationsFromStorageStatus( - storageStatuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = { - val blockLocationPairs = storageStatuses.flatMap { storageStatus => - storageStatus.blocks.map { case (bid, _) => (bid, storageStatus.blockManagerId.hostPort) } +} + +/** Helper methods for storage-related objects. */ +private[spark] object StorageUtils { + + /** + * Update the given list of RDDInfo with the given list of storage statuses. + * This method overwrites the old values stored in the RDDInfo's. + */ + def updateRddInfo(rddInfos: Seq[RDDInfo], statuses: Seq[StorageStatus]): Unit = { + rddInfos.foreach { rddInfo => + val rddId = rddInfo.id + // Assume all blocks belonging to the same RDD have the same storage level + val storageLevel = statuses + .map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel.NONE) + val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum + val memSize = statuses.map(_.memUsedByRdd(rddId)).sum + val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum + val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum + + rddInfo.storageLevel = storageLevel + rddInfo.numCachedPartitions = numCachedPartitions + rddInfo.memSize = memSize + rddInfo.diskSize = diskSize + rddInfo.tachyonSize = tachyonSize } - blockLocationPairs.toMap - .groupBy { case (blockId, _) => blockId } - .mapValues(_.values.toSeq) } - /** Filters the given list of StorageStatus by the given RDD ID. */ - def filterStorageStatusByRDD( - storageStatuses: Seq[StorageStatus], - rddId: Int): Array[StorageStatus] = { - storageStatuses.map { status => - val filteredBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toSeq - val filteredBlockMap = mutable.Map[BlockId, BlockStatus](filteredBlocks: _*) - new StorageStatus(status.blockManagerId, status.maxMem, filteredBlockMap) - }.toArray + /** + * Return a mapping from block ID to its locations for each block that belongs to the given RDD. + */ + def getRddBlockLocations(rddId: Int, statuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = { + val blockLocations = new mutable.HashMap[BlockId, mutable.ListBuffer[String]] + statuses.foreach { status => + status.rddBlocksById(rddId).foreach { case (bid, _) => + val location = status.blockManagerId.hostPort + blockLocations.getOrElseUpdate(bid, mutable.ListBuffer.empty) += location + } + } + blockLocations } + } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index b358c855e1c88e15c884c856aa85df4485a785d6..b814b0e6b85090ad8a20f3a6b73d908200cfafb0 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -49,9 +49,9 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = listener.storageStatusList - val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _) - val memUsed = storageStatusList.map(_.memUsed).fold(0L)(_ + _) - val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _) + val maxMem = storageStatusList.map(_.maxMem).sum + val memUsed = storageStatusList.map(_.memUsed).sum + val diskUsed = storageStatusList.map(_.diskUsed).sum val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId) val execInfoSorted = execInfo.sortBy(_.id) @@ -80,7 +80,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { </th> </thead> <tbody> - {execInfoSorted.map(execRow(_))} + {execInfoSorted.map(execRow)} </tbody> </table> @@ -91,7 +91,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { <li><strong>Memory:</strong> {Utils.bytesToString(memUsed)} Used ({Utils.bytesToString(maxMem)} Total) </li> - <li><strong>Disk:</strong> {Utils.bytesToString(diskSpaceUsed)} Used </li> + <li><strong>Disk:</strong> {Utils.bytesToString(diskUsed)} Used </li> </ul> </div> </div> @@ -145,7 +145,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { val status = listener.storageStatusList(statusId) val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort - val rddBlocks = status.blocks.size + val rddBlocks = status.numBlocks val memUsed = status.memUsed val maxMem = status.maxMem val diskUsed = status.diskUsed diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 2155633b8096fba3d2fe8bd4b6968159392e2e80..84ac53da4755287a73408b050e0aa7ed6eb444ab 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -45,12 +45,13 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers) // Block table - val filteredStorageStatusList = StorageUtils.filterStorageStatusByRDD(storageStatusList, rddId) - val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).sortWith(_._1.name < _._1.name) - val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList) - val blocks = blockStatuses.map { case (blockId, status) => - (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) - } + val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList) + val blocks = storageStatusList + .flatMap(_.rddBlocksById(rddId)) + .sortWith(_._1.name < _._1.name) + .map { case (blockId, status) => + (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) + } val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks) val content = @@ -119,10 +120,10 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { <tr> <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td> <td> - {Utils.bytesToString(status.memUsedByRDD(rddId))} + {Utils.bytesToString(status.memUsedByRdd(rddId))} ({Utils.bytesToString(status.memRemaining)} Remaining) </td> - <td>{Utils.bytesToString(status.diskUsedByRDD(rddId))}</td> + <td>{Utils.bytesToString(status.diskUsedByRdd(rddId))}</td> </tr> } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 0cc0cf3117173cfd46167fa77509abfc02e5b16f..5f6740d49552190511e175b56596ee6ef509566f 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -41,19 +41,18 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage" */ @DeveloperApi class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener { - private val _rddInfoMap = mutable.Map[Int, RDDInfo]() + private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing def storageStatusList = storageStatusListener.storageStatusList /** Filter RDD info to include only those with cached partitions */ def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq - /** Update each RDD's info to reflect any updates to the RDD's storage status */ - private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) { - val rddInfos = _rddInfoMap.values.toSeq - val updatedRddInfos = - StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks) - updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info } + /** Update the storage info of the RDDs whose blocks are among the given updated blocks */ + private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = { + val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet + val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) } + StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList) } /** diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala index fb18c3ebfe46f71c5bb88ac9daf9a218b2aca48b..e6ab538d77bcc89c7c39954a56455bbdce14883e 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark import org.scalatest.{Assertions, FunSuite} +import org.apache.spark.storage.StorageLevel class SparkContextInfoSuite extends FunSuite with LocalSparkContext { test("getPersistentRDDs only returns RDDs that are marked as cached") { @@ -35,26 +36,33 @@ class SparkContextInfoSuite extends FunSuite with LocalSparkContext { test("getPersistentRDDs returns an immutable map") { sc = new SparkContext("local", "test") val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() - val myRdds = sc.getPersistentRDDs assert(myRdds.size === 1) - assert(myRdds.values.head === rdd1) + assert(myRdds(0) === rdd1) + assert(myRdds(0).getStorageLevel === StorageLevel.MEMORY_ONLY) + // myRdds2 should have 2 RDDs, but myRdds should not change val rdd2 = sc.makeRDD(Array(5, 6, 7, 8), 1).cache() - - // getPersistentRDDs should have 2 RDDs, but myRdds should not change - assert(sc.getPersistentRDDs.size === 2) + val myRdds2 = sc.getPersistentRDDs + assert(myRdds2.size === 2) + assert(myRdds2(0) === rdd1) + assert(myRdds2(1) === rdd2) + assert(myRdds2(0).getStorageLevel === StorageLevel.MEMORY_ONLY) + assert(myRdds2(1).getStorageLevel === StorageLevel.MEMORY_ONLY) assert(myRdds.size === 1) + assert(myRdds(0) === rdd1) + assert(myRdds(0).getStorageLevel === StorageLevel.MEMORY_ONLY) } test("getRDDStorageInfo only reports on RDDs that actually persist data") { sc = new SparkContext("local", "test") val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() - assert(sc.getRDDStorageInfo.size === 0) - rdd.collect() assert(sc.getRDDStorageInfo.size === 1) + assert(sc.getRDDStorageInfo.head.isCached) + assert(sc.getRDDStorageInfo.head.memSize > 0) + assert(sc.getRDDStorageInfo.head.storageLevel === StorageLevel.MEMORY_ONLY) } test("call sites report correct locations") { diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 2179c6dd3302e44c3a1c8e77215e7852c8d4d60f..51fb646a3cb617bf843c806d3fd35ed2d6835b6b 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -41,13 +41,13 @@ class StorageStatusListenerSuite extends FunSuite { assert(listener.executorIdToStorageStatus.get("big").isDefined) assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1) assert(listener.executorIdToStorageStatus("big").maxMem === 1000L) - assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 0) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) assert(listener.executorIdToStorageStatus.size === 2) assert(listener.executorIdToStorageStatus.get("fat").isDefined) assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2) assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) // Block manager remove listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1)) @@ -67,14 +67,14 @@ class StorageStatusListenerSuite extends FunSuite { val taskMetrics = new TaskMetrics // Task end with no updated blocks - assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 0) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics)) - assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 0) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics)) - assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 0) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) } test("task end with updated blocks") { @@ -90,20 +90,20 @@ class StorageStatusListenerSuite extends FunSuite { taskMetrics2.updatedBlocks = Some(Seq(block3)) // Task end with new blocks - assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 0) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 2) - assert(listener.executorIdToStorageStatus("fat").blocks.size === 0) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 2) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 2) - assert(listener.executorIdToStorageStatus("fat").blocks.size === 1) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) - assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0))) + assert(listener.executorIdToStorageStatus("big").numBlocks === 2) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 1) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0))) // Task end with dropped blocks val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) @@ -112,17 +112,17 @@ class StorageStatusListenerSuite extends FunSuite { taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3)) taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3)) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 1) - assert(listener.executorIdToStorageStatus("fat").blocks.size === 1) - assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) - assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0))) + assert(listener.executorIdToStorageStatus("big").numBlocks === 1) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 1) + assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0))) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 1) - assert(listener.executorIdToStorageStatus("fat").blocks.size === 0) - assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 1) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) + assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) } test("unpersist RDD") { @@ -137,16 +137,16 @@ class StorageStatusListenerSuite extends FunSuite { taskMetrics2.updatedBlocks = Some(Seq(block3)) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 3) + assert(listener.executorIdToStorageStatus("big").numBlocks === 3) // Unpersist RDD listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 3) + assert(listener.executorIdToStorageStatus("big").numBlocks === 3) listener.onUnpersistRDD(SparkListenerUnpersistRDD(4)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 2) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("big").numBlocks === 2) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) listener.onUnpersistRDD(SparkListenerUnpersistRDD(1)) - assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 0) } } diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..38678bbd1dd28c3749b32e67ca7c652662d56866 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.scalatest.FunSuite + +/** + * Test various functionalities in StorageUtils and StorageStatus. + */ +class StorageSuite extends FunSuite { + private val memAndDisk = StorageLevel.MEMORY_AND_DISK + + // For testing add, update, and remove (for non-RDD blocks) + private def storageStatus1: StorageStatus = { + val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L) + assert(status.blocks.isEmpty) + assert(status.rddBlocks.isEmpty) + assert(status.memUsed === 0L) + assert(status.memRemaining === 1000L) + assert(status.diskUsed === 0L) + assert(status.offHeapUsed === 0L) + status.addBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 10L, 20L, 1L)) + status.addBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 10L, 20L, 1L)) + status.addBlock(TestBlockId("faa"), BlockStatus(memAndDisk, 10L, 20L, 1L)) + status + } + + test("storage status add non-RDD blocks") { + val status = storageStatus1 + assert(status.blocks.size === 3) + assert(status.blocks.contains(TestBlockId("foo"))) + assert(status.blocks.contains(TestBlockId("fee"))) + assert(status.blocks.contains(TestBlockId("faa"))) + assert(status.rddBlocks.isEmpty) + assert(status.memUsed === 30L) + assert(status.memRemaining === 970L) + assert(status.diskUsed === 60L) + assert(status.offHeapUsed === 3L) + } + + test("storage status update non-RDD blocks") { + val status = storageStatus1 + status.updateBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 50L, 100L, 1L)) + status.updateBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 100L, 20L, 0L)) + assert(status.blocks.size === 3) + assert(status.memUsed === 160L) + assert(status.memRemaining === 840L) + assert(status.diskUsed === 140L) + assert(status.offHeapUsed === 2L) + } + + test("storage status remove non-RDD blocks") { + val status = storageStatus1 + status.removeBlock(TestBlockId("foo")) + status.removeBlock(TestBlockId("faa")) + assert(status.blocks.size === 1) + assert(status.blocks.contains(TestBlockId("fee"))) + assert(status.memUsed === 10L) + assert(status.memRemaining === 990L) + assert(status.diskUsed === 20L) + assert(status.offHeapUsed === 1L) + } + + // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks + private def storageStatus2: StorageStatus = { + val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L) + assert(status.rddBlocks.isEmpty) + status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L, 0L)) + status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L, 0L)) + status.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 10L, 20L, 1L)) + status.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 100L, 200L, 1L)) + status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L, 1L)) + status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L, 0L)) + status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L, 0L)) + status + } + + test("storage status add RDD blocks") { + val status = storageStatus2 + assert(status.blocks.size === 7) + assert(status.rddBlocks.size === 5) + assert(status.rddBlocks.contains(RDDBlockId(0, 0))) + assert(status.rddBlocks.contains(RDDBlockId(1, 1))) + assert(status.rddBlocks.contains(RDDBlockId(2, 2))) + assert(status.rddBlocks.contains(RDDBlockId(2, 3))) + assert(status.rddBlocks.contains(RDDBlockId(2, 4))) + assert(status.rddBlocksById(0).size === 1) + assert(status.rddBlocksById(0).contains(RDDBlockId(0, 0))) + assert(status.rddBlocksById(1).size === 1) + assert(status.rddBlocksById(1).contains(RDDBlockId(1, 1))) + assert(status.rddBlocksById(2).size === 3) + assert(status.rddBlocksById(2).contains(RDDBlockId(2, 2))) + assert(status.rddBlocksById(2).contains(RDDBlockId(2, 3))) + assert(status.rddBlocksById(2).contains(RDDBlockId(2, 4))) + assert(status.memUsedByRdd(0) === 10L) + assert(status.memUsedByRdd(1) === 100L) + assert(status.memUsedByRdd(2) === 30L) + assert(status.diskUsedByRdd(0) === 20L) + assert(status.diskUsedByRdd(1) === 200L) + assert(status.diskUsedByRdd(2) === 80L) + assert(status.offHeapUsedByRdd(0) === 1L) + assert(status.offHeapUsedByRdd(1) === 1L) + assert(status.offHeapUsedByRdd(2) === 1L) + assert(status.rddStorageLevel(0) === Some(memAndDisk)) + assert(status.rddStorageLevel(1) === Some(memAndDisk)) + assert(status.rddStorageLevel(2) === Some(memAndDisk)) + + // Verify default values for RDDs that don't exist + assert(status.rddBlocksById(10).isEmpty) + assert(status.memUsedByRdd(10) === 0L) + assert(status.diskUsedByRdd(10) === 0L) + assert(status.offHeapUsedByRdd(10) === 0L) + assert(status.rddStorageLevel(10) === None) + } + + test("storage status update RDD blocks") { + val status = storageStatus2 + status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 5000L, 0L, 0L)) + status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L, 0L)) + status.updateBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 0L, 1000L, 0L)) + assert(status.blocks.size === 7) + assert(status.rddBlocks.size === 5) + assert(status.rddBlocksById(0).size === 1) + assert(status.rddBlocksById(1).size === 1) + assert(status.rddBlocksById(2).size === 3) + assert(status.memUsedByRdd(0) === 0L) + assert(status.memUsedByRdd(1) === 100L) + assert(status.memUsedByRdd(2) === 20L) + assert(status.diskUsedByRdd(0) === 0L) + assert(status.diskUsedByRdd(1) === 200L) + assert(status.diskUsedByRdd(2) === 1060L) + assert(status.offHeapUsedByRdd(0) === 0L) + assert(status.offHeapUsedByRdd(1) === 1L) + assert(status.offHeapUsedByRdd(2) === 0L) + } + + test("storage status remove RDD blocks") { + val status = storageStatus2 + status.removeBlock(TestBlockId("man")) + status.removeBlock(RDDBlockId(1, 1)) + status.removeBlock(RDDBlockId(2, 2)) + status.removeBlock(RDDBlockId(2, 4)) + assert(status.blocks.size === 3) + assert(status.rddBlocks.size === 2) + assert(status.rddBlocks.contains(RDDBlockId(0, 0))) + assert(status.rddBlocks.contains(RDDBlockId(2, 3))) + assert(status.rddBlocksById(0).size === 1) + assert(status.rddBlocksById(0).contains(RDDBlockId(0, 0))) + assert(status.rddBlocksById(1).size === 0) + assert(status.rddBlocksById(2).size === 1) + assert(status.rddBlocksById(2).contains(RDDBlockId(2, 3))) + assert(status.memUsedByRdd(0) === 10L) + assert(status.memUsedByRdd(1) === 0L) + assert(status.memUsedByRdd(2) === 10L) + assert(status.diskUsedByRdd(0) === 20L) + assert(status.diskUsedByRdd(1) === 0L) + assert(status.diskUsedByRdd(2) === 20L) + assert(status.offHeapUsedByRdd(0) === 1L) + assert(status.offHeapUsedByRdd(1) === 0L) + assert(status.offHeapUsedByRdd(2) === 0L) + } + + test("storage status containsBlock") { + val status = storageStatus2 + // blocks that actually exist + assert(status.blocks.contains(TestBlockId("dan")) === status.containsBlock(TestBlockId("dan"))) + assert(status.blocks.contains(TestBlockId("man")) === status.containsBlock(TestBlockId("man"))) + assert(status.blocks.contains(RDDBlockId(0, 0)) === status.containsBlock(RDDBlockId(0, 0))) + assert(status.blocks.contains(RDDBlockId(1, 1)) === status.containsBlock(RDDBlockId(1, 1))) + assert(status.blocks.contains(RDDBlockId(2, 2)) === status.containsBlock(RDDBlockId(2, 2))) + assert(status.blocks.contains(RDDBlockId(2, 3)) === status.containsBlock(RDDBlockId(2, 3))) + assert(status.blocks.contains(RDDBlockId(2, 4)) === status.containsBlock(RDDBlockId(2, 4))) + // blocks that don't exist + assert(status.blocks.contains(TestBlockId("fan")) === status.containsBlock(TestBlockId("fan"))) + assert(status.blocks.contains(RDDBlockId(100, 0)) === status.containsBlock(RDDBlockId(100, 0))) + } + + test("storage status getBlock") { + val status = storageStatus2 + // blocks that actually exist + assert(status.blocks.get(TestBlockId("dan")) === status.getBlock(TestBlockId("dan"))) + assert(status.blocks.get(TestBlockId("man")) === status.getBlock(TestBlockId("man"))) + assert(status.blocks.get(RDDBlockId(0, 0)) === status.getBlock(RDDBlockId(0, 0))) + assert(status.blocks.get(RDDBlockId(1, 1)) === status.getBlock(RDDBlockId(1, 1))) + assert(status.blocks.get(RDDBlockId(2, 2)) === status.getBlock(RDDBlockId(2, 2))) + assert(status.blocks.get(RDDBlockId(2, 3)) === status.getBlock(RDDBlockId(2, 3))) + assert(status.blocks.get(RDDBlockId(2, 4)) === status.getBlock(RDDBlockId(2, 4))) + // blocks that don't exist + assert(status.blocks.get(TestBlockId("fan")) === status.getBlock(TestBlockId("fan"))) + assert(status.blocks.get(RDDBlockId(100, 0)) === status.getBlock(RDDBlockId(100, 0))) + } + + test("storage status num[Rdd]Blocks") { + val status = storageStatus2 + assert(status.blocks.size === status.numBlocks) + assert(status.rddBlocks.size === status.numRddBlocks) + status.addBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 0L, 100L)) + status.addBlock(RDDBlockId(4, 4), BlockStatus(memAndDisk, 0L, 0L, 100L)) + status.addBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L, 100L)) + assert(status.blocks.size === status.numBlocks) + assert(status.rddBlocks.size === status.numRddBlocks) + assert(status.rddBlocksById(4).size === status.numRddBlocksById(4)) + assert(status.rddBlocksById(10).size === status.numRddBlocksById(10)) + status.updateBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 10L, 400L)) + status.updateBlock(RDDBlockId(4, 0), BlockStatus(memAndDisk, 0L, 0L, 100L)) + status.updateBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L, 100L)) + status.updateBlock(RDDBlockId(10, 10), BlockStatus(memAndDisk, 0L, 0L, 100L)) + assert(status.blocks.size === status.numBlocks) + assert(status.rddBlocks.size === status.numRddBlocks) + assert(status.rddBlocksById(4).size === status.numRddBlocksById(4)) + assert(status.rddBlocksById(10).size === status.numRddBlocksById(10)) + assert(status.rddBlocksById(100).size === status.numRddBlocksById(100)) + status.removeBlock(RDDBlockId(4, 0)) + status.removeBlock(RDDBlockId(10, 10)) + assert(status.blocks.size === status.numBlocks) + assert(status.rddBlocks.size === status.numRddBlocks) + assert(status.rddBlocksById(4).size === status.numRddBlocksById(4)) + assert(status.rddBlocksById(10).size === status.numRddBlocksById(10)) + // remove a block that doesn't exist + status.removeBlock(RDDBlockId(1000, 999)) + assert(status.blocks.size === status.numBlocks) + assert(status.rddBlocks.size === status.numRddBlocks) + assert(status.rddBlocksById(4).size === status.numRddBlocksById(4)) + assert(status.rddBlocksById(10).size === status.numRddBlocksById(10)) + assert(status.rddBlocksById(1000).size === status.numRddBlocksById(1000)) + } + + test("storage status memUsed, diskUsed, tachyonUsed") { + val status = storageStatus2 + def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum + def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum + def actualOffHeapUsed: Long = status.blocks.values.map(_.tachyonSize).sum + assert(status.memUsed === actualMemUsed) + assert(status.diskUsed === actualDiskUsed) + assert(status.offHeapUsed === actualOffHeapUsed) + status.addBlock(TestBlockId("fire"), BlockStatus(memAndDisk, 4000L, 5000L, 6000L)) + status.addBlock(TestBlockId("wire"), BlockStatus(memAndDisk, 400L, 500L, 600L)) + status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L, 60L)) + assert(status.memUsed === actualMemUsed) + assert(status.diskUsed === actualDiskUsed) + assert(status.offHeapUsed === actualOffHeapUsed) + status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L, 6L)) + status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 4L, 5L, 6L)) + status.updateBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 4L, 5L, 6L)) + assert(status.memUsed === actualMemUsed) + assert(status.diskUsed === actualDiskUsed) + assert(status.offHeapUsed === actualOffHeapUsed) + status.removeBlock(TestBlockId("fire")) + status.removeBlock(TestBlockId("man")) + status.removeBlock(RDDBlockId(2, 2)) + status.removeBlock(RDDBlockId(2, 3)) + assert(status.memUsed === actualMemUsed) + assert(status.diskUsed === actualDiskUsed) + assert(status.offHeapUsed === actualOffHeapUsed) + } + + // For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations + private def stockStorageStatuses: Seq[StorageStatus] = { + val status1 = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L) + val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2, 2), 2000L) + val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3, 3), 3000L) + status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L)) + status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L, 0L)) + status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L, 0L)) + status2.addBlock(RDDBlockId(0, 3), BlockStatus(memAndDisk, 1L, 2L, 0L)) + status2.addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L, 0L)) + status2.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 1L, 2L, 0L)) + status3.addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L, 0L)) + status3.addBlock(RDDBlockId(1, 2), BlockStatus(memAndDisk, 1L, 2L, 0L)) + Seq(status1, status2, status3) + } + + // For testing StorageUtils.updateRddInfo + private def stockRDDInfos: Seq[RDDInfo] = { + val info0 = new RDDInfo(0, "0", 10, memAndDisk) + val info1 = new RDDInfo(1, "1", 3, memAndDisk) + Seq(info0, info1) + } + + test("StorageUtils.updateRddInfo") { + val storageStatuses = stockStorageStatuses + val rddInfos = stockRDDInfos + StorageUtils.updateRddInfo(rddInfos, storageStatuses) + assert(rddInfos(0).storageLevel === memAndDisk) + assert(rddInfos(0).numCachedPartitions === 5) + assert(rddInfos(0).memSize === 5L) + assert(rddInfos(0).diskSize === 10L) + assert(rddInfos(0).tachyonSize === 0L) + assert(rddInfos(1).storageLevel === memAndDisk) + assert(rddInfos(1).numCachedPartitions === 3) + assert(rddInfos(1).memSize === 3L) + assert(rddInfos(1).diskSize === 6L) + assert(rddInfos(1).tachyonSize === 0L) + } + + test("StorageUtils.getRddBlockLocations") { + val storageStatuses = stockStorageStatuses + val blockLocations0 = StorageUtils.getRddBlockLocations(0, storageStatuses) + val blockLocations1 = StorageUtils.getRddBlockLocations(1, storageStatuses) + assert(blockLocations0.size === 5) + assert(blockLocations1.size === 3) + assert(blockLocations0.contains(RDDBlockId(0, 0))) + assert(blockLocations0.contains(RDDBlockId(0, 1))) + assert(blockLocations0.contains(RDDBlockId(0, 2))) + assert(blockLocations0.contains(RDDBlockId(0, 3))) + assert(blockLocations0.contains(RDDBlockId(0, 4))) + assert(blockLocations1.contains(RDDBlockId(1, 0))) + assert(blockLocations1.contains(RDDBlockId(1, 1))) + assert(blockLocations1.contains(RDDBlockId(1, 2))) + assert(blockLocations0(RDDBlockId(0, 0)) === Seq("dog:1")) + assert(blockLocations0(RDDBlockId(0, 1)) === Seq("dog:1")) + assert(blockLocations0(RDDBlockId(0, 2)) === Seq("duck:2")) + assert(blockLocations0(RDDBlockId(0, 3)) === Seq("duck:2")) + assert(blockLocations0(RDDBlockId(0, 4)) === Seq("cat:3")) + assert(blockLocations1(RDDBlockId(1, 0)) === Seq("duck:2")) + assert(blockLocations1(RDDBlockId(1, 1)) === Seq("duck:2")) + assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3")) + } + + test("StorageUtils.getRddBlockLocations with multiple locations") { + val storageStatuses = stockStorageStatuses + storageStatuses(0).addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L, 0L)) + storageStatuses(0).addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L, 0L)) + storageStatuses(2).addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L)) + val blockLocations0 = StorageUtils.getRddBlockLocations(0, storageStatuses) + val blockLocations1 = StorageUtils.getRddBlockLocations(1, storageStatuses) + assert(blockLocations0.size === 5) + assert(blockLocations1.size === 3) + assert(blockLocations0(RDDBlockId(0, 0)) === Seq("dog:1", "cat:3")) + assert(blockLocations0(RDDBlockId(0, 1)) === Seq("dog:1")) + assert(blockLocations0(RDDBlockId(0, 2)) === Seq("duck:2")) + assert(blockLocations0(RDDBlockId(0, 3)) === Seq("duck:2")) + assert(blockLocations0(RDDBlockId(0, 4)) === Seq("dog:1", "cat:3")) + assert(blockLocations1(RDDBlockId(1, 0)) === Seq("dog:1", "duck:2")) + assert(blockLocations1(RDDBlockId(1, 1)) === Seq("duck:2")) + assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3")) + } + +} diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..6e68dcb3425aa09c84dfc47fb53132e1462f91fb --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.storage + +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.apache.spark.Success +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler._ +import org.apache.spark.storage._ + +/** + * Test various functionality in the StorageListener that supports the StorageTab. + */ +class StorageTabSuite extends FunSuite with BeforeAndAfter { + private var bus: LiveListenerBus = _ + private var storageStatusListener: StorageStatusListener = _ + private var storageListener: StorageListener = _ + private val memAndDisk = StorageLevel.MEMORY_AND_DISK + private val memOnly = StorageLevel.MEMORY_ONLY + private val none = StorageLevel.NONE + private val taskInfo = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false) + private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly) + private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly) + private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk) + private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk) + private val bm1 = BlockManagerId("big", "dog", 1, 1) + + before { + bus = new LiveListenerBus + storageStatusListener = new StorageStatusListener + storageListener = new StorageListener(storageStatusListener) + bus.addListener(storageStatusListener) + bus.addListener(storageListener) + } + + test("stage submitted / completed") { + assert(storageListener._rddInfoMap.isEmpty) + assert(storageListener.rddInfoList.isEmpty) + + // 2 RDDs are known, but none are cached + val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0, rddInfo1), "details") + bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) + assert(storageListener._rddInfoMap.size === 2) + assert(storageListener.rddInfoList.isEmpty) + + // 4 RDDs are known, but only 2 are cached + val rddInfo2Cached = rddInfo2 + val rddInfo3Cached = rddInfo3 + rddInfo2Cached.numCachedPartitions = 1 + rddInfo3Cached.numCachedPartitions = 1 + val stageInfo1 = new StageInfo(1, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details") + bus.postToAll(SparkListenerStageSubmitted(stageInfo1)) + assert(storageListener._rddInfoMap.size === 4) + assert(storageListener.rddInfoList.size === 2) + + // Submitting RDDInfos with duplicate IDs does nothing + val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY) + rddInfo0Cached.numCachedPartitions = 1 + val stageInfo0Cached = new StageInfo(0, "0", 100, Seq(rddInfo0), "details") + bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached)) + assert(storageListener._rddInfoMap.size === 4) + assert(storageListener.rddInfoList.size === 2) + + // We only keep around the RDDs that are cached + bus.postToAll(SparkListenerStageCompleted(stageInfo0)) + assert(storageListener._rddInfoMap.size === 2) + assert(storageListener.rddInfoList.size === 2) + } + + test("unpersist") { + val rddInfo0Cached = rddInfo0 + val rddInfo1Cached = rddInfo1 + rddInfo0Cached.numCachedPartitions = 1 + rddInfo1Cached.numCachedPartitions = 1 + val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details") + bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) + assert(storageListener._rddInfoMap.size === 2) + assert(storageListener.rddInfoList.size === 2) + bus.postToAll(SparkListenerUnpersistRDD(0)) + assert(storageListener._rddInfoMap.size === 1) + assert(storageListener.rddInfoList.size === 1) + bus.postToAll(SparkListenerUnpersistRDD(4)) // doesn't exist + assert(storageListener._rddInfoMap.size === 1) + assert(storageListener.rddInfoList.size === 1) + bus.postToAll(SparkListenerUnpersistRDD(1)) + assert(storageListener._rddInfoMap.size === 0) + assert(storageListener.rddInfoList.size === 0) + } + + test("task end") { + val myRddInfo0 = rddInfo0 + val myRddInfo1 = rddInfo1 + val myRddInfo2 = rddInfo2 + val stageInfo0 = new StageInfo(0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details") + bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L)) + bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) + assert(storageListener._rddInfoMap.size === 3) + assert(storageListener.rddInfoList.size === 0) // not cached + assert(!storageListener._rddInfoMap(0).isCached) + assert(!storageListener._rddInfoMap(1).isCached) + assert(!storageListener._rddInfoMap(2).isCached) + + // Task end with no updated blocks. This should not change anything. + bus.postToAll(SparkListenerTaskEnd(0, "obliteration", Success, taskInfo, new TaskMetrics)) + assert(storageListener._rddInfoMap.size === 3) + assert(storageListener.rddInfoList.size === 0) + + // Task end with a few new persisted blocks, some from the same RDD + val metrics1 = new TaskMetrics + metrics1.updatedBlocks = Some(Seq( + (RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L, 0L)), + (RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L, 0L)), + (RDDBlockId(0, 102), BlockStatus(memAndDisk, 400L, 0L, 200L)), + (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L, 0L)) + )) + bus.postToAll(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo, metrics1)) + assert(storageListener._rddInfoMap(0).memSize === 800L) + assert(storageListener._rddInfoMap(0).diskSize === 400L) + assert(storageListener._rddInfoMap(0).tachyonSize === 200L) + assert(storageListener._rddInfoMap(0).numCachedPartitions === 3) + assert(storageListener._rddInfoMap(0).isCached) + assert(storageListener._rddInfoMap(1).memSize === 0L) + assert(storageListener._rddInfoMap(1).diskSize === 240L) + assert(storageListener._rddInfoMap(1).tachyonSize === 0L) + assert(storageListener._rddInfoMap(1).numCachedPartitions === 1) + assert(storageListener._rddInfoMap(1).isCached) + assert(!storageListener._rddInfoMap(2).isCached) + assert(storageListener._rddInfoMap(2).numCachedPartitions === 0) + + // Task end with a few dropped blocks + val metrics2 = new TaskMetrics + metrics2.updatedBlocks = Some(Seq( + (RDDBlockId(0, 100), BlockStatus(none, 0L, 0L, 0L)), + (RDDBlockId(1, 20), BlockStatus(none, 0L, 0L, 0L)), + (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L, 0L)), // doesn't actually exist + (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L, 0L)) // doesn't actually exist + )) + bus.postToAll(SparkListenerTaskEnd(2, "obliteration", Success, taskInfo, metrics2)) + assert(storageListener._rddInfoMap(0).memSize === 400L) + assert(storageListener._rddInfoMap(0).diskSize === 400L) + assert(storageListener._rddInfoMap(0).tachyonSize === 200L) + assert(storageListener._rddInfoMap(0).numCachedPartitions === 2) + assert(storageListener._rddInfoMap(0).isCached) + assert(!storageListener._rddInfoMap(1).isCached) + assert(storageListener._rddInfoMap(2).numCachedPartitions === 0) + assert(!storageListener._rddInfoMap(2).isCached) + assert(storageListener._rddInfoMap(2).numCachedPartitions === 0) + } + +}