diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java index 646496f3135076bfcf1de3312acf54ff67ee94b9..fa9acf0a15b88566ee152b80b36b39c3f1668f2b 100644 --- a/core/src/main/java/org/apache/spark/JavaSparkListener.java +++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java @@ -17,23 +17,7 @@ package org.apache.spark; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerApplicationEnd; -import org.apache.spark.scheduler.SparkListenerApplicationStart; -import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; -import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; -import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorAdded; -import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorRemoved; -import org.apache.spark.scheduler.SparkListenerJobEnd; -import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; -import org.apache.spark.scheduler.SparkListenerTaskEnd; -import org.apache.spark.scheduler.SparkListenerTaskGettingResult; -import org.apache.spark.scheduler.SparkListenerTaskStart; -import org.apache.spark.scheduler.SparkListenerUnpersistRDD; +import org.apache.spark.scheduler.*; /** * Java clients should extend this class instead of implementing @@ -94,4 +78,8 @@ public class JavaSparkListener implements SparkListener { @Override public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { } + + @Override + public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { } + } diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index fbc566695905571fcc12aa678ed52cced248d344..1214d05ba6063a82bada4ed33179a481bcb40286 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -112,4 +112,10 @@ public class SparkFirehoseListener implements SparkListener { public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { onEvent(executorRemoved); } + + @Override + public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { + onEvent(blockUpdated); + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 62b05033a928193a0149ababb9efc0f14e8abec7..5a06ef02f5c57482f14395b239eaf12eeaee06b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -199,6 +199,9 @@ private[spark] class EventLoggingListener( logEvent(event, flushLogger = true) } + // No-op because logging every update would be overkill + override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {} + // No-op because logging every update would be overkill override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 9620915f495abd2a1321d6b3707d655e2a15d642..896f1743332f1a9a3aa6dce92085c69bbb3085be 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -26,7 +26,7 @@ import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.storage.BlockManagerId +import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo} import org.apache.spark.util.{Distribution, Utils} @DeveloperApi @@ -98,6 +98,9 @@ case class SparkListenerExecutorAdded(time: Long, executorId: String, executorIn case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent + /** * Periodic updates from executors. * @param execId executor id @@ -215,6 +218,11 @@ trait SparkListener { * Called when the driver removes an executor. */ def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { } + + /** + * Called when the driver receives a block update info. + */ + def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 61e69ecc083871af0a1e18a77bed980fb505261a..04afde33f5aad7170be6469b098f4d64d799932a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -58,6 +58,8 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi listener.onExecutorAdded(executorAdded) case executorRemoved: SparkListenerExecutorRemoved => listener.onExecutorRemoved(executorRemoved) + case blockUpdated: SparkListenerBlockUpdated => + listener.onBlockUpdated(blockUpdated) case logStart: SparkListenerLogStart => // ignore event log metadata } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 68ed9096731c513c57336f7e199eaf481e62b061..5dc0c537cbb62829cebf64bede4c179ed68a64d0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -60,10 +60,11 @@ class BlockManagerMasterEndpoint( register(blockManagerId, maxMemSize, slaveEndpoint) context.reply(true) - case UpdateBlockInfo( + case _updateBlockInfo @ UpdateBlockInfo( blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) => context.reply(updateBlockInfo( blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize)) + listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo))) case GetLocations(blockId) => context.reply(getLocations(blockId)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala new file mode 100644 index 0000000000000000000000000000000000000000..2789e25b8d3ab2d0949d7d14fc2e50b79c203583 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.collection.mutable + +import org.apache.spark.scheduler._ + +private[spark] case class BlockUIData( + blockId: BlockId, + location: String, + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long, + externalBlockStoreSize: Long) + +/** + * The aggregated status of stream blocks in an executor + */ +private[spark] case class ExecutorStreamBlockStatus( + executorId: String, + location: String, + blocks: Seq[BlockUIData]) { + + def totalMemSize: Long = blocks.map(_.memSize).sum + + def totalDiskSize: Long = blocks.map(_.diskSize).sum + + def totalExternalBlockStoreSize: Long = blocks.map(_.externalBlockStoreSize).sum + + def numStreamBlocks: Int = blocks.size + +} + +private[spark] class BlockStatusListener extends SparkListener { + + private val blockManagers = + new mutable.HashMap[BlockManagerId, mutable.HashMap[BlockId, BlockUIData]] + + override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { + val blockId = blockUpdated.blockUpdatedInfo.blockId + if (!blockId.isInstanceOf[StreamBlockId]) { + // Now we only monitor StreamBlocks + return + } + val blockManagerId = blockUpdated.blockUpdatedInfo.blockManagerId + val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel + val memSize = blockUpdated.blockUpdatedInfo.memSize + val diskSize = blockUpdated.blockUpdatedInfo.diskSize + val externalBlockStoreSize = blockUpdated.blockUpdatedInfo.externalBlockStoreSize + + synchronized { + // Drop the update info if the block manager is not registered + blockManagers.get(blockManagerId).foreach { blocksInBlockManager => + if (storageLevel.isValid) { + blocksInBlockManager.put(blockId, + BlockUIData( + blockId, + blockManagerId.hostPort, + storageLevel, + memSize, + diskSize, + externalBlockStoreSize) + ) + } else { + // If isValid is not true, it means we should drop the block. + blocksInBlockManager -= blockId + } + } + } + } + + override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { + synchronized { + blockManagers.put(blockManagerAdded.blockManagerId, mutable.HashMap()) + } + } + + override def onBlockManagerRemoved( + blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = synchronized { + blockManagers -= blockManagerRemoved.blockManagerId + } + + def allExecutorStreamBlockStatus: Seq[ExecutorStreamBlockStatus] = synchronized { + blockManagers.map { case (blockManagerId, blocks) => + ExecutorStreamBlockStatus( + blockManagerId.executorId, blockManagerId.hostPort, blocks.values.toSeq) + }.toSeq + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala new file mode 100644 index 0000000000000000000000000000000000000000..a5790e4454a894f6047e2133f343642327785766 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.storage.BlockManagerMessages.UpdateBlockInfo + +/** + * :: DeveloperApi :: + * Stores information about a block status in a block manager. + */ +@DeveloperApi +case class BlockUpdatedInfo( + blockManagerId: BlockManagerId, + blockId: BlockId, + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long, + externalBlockStoreSize: Long) + +private[spark] object BlockUpdatedInfo { + + private[spark] def apply(updateBlockInfo: UpdateBlockInfo): BlockUpdatedInfo = { + BlockUpdatedInfo( + updateBlockInfo.blockManagerId, + updateBlockInfo.blockId, + updateBlockInfo.storageLevel, + updateBlockInfo.memSize, + updateBlockInfo.diskSize, + updateBlockInfo.externalBlockStoreSize) + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 78980395192017de30b7d06d25af888799dcb9de..718aea7e1dc22877802511cb992d5462ee806253 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -27,7 +27,7 @@ import org.apache.spark.ui.scope.RDDOperationGraph /** Utility functions for generating XML pages with spark content. */ private[spark] object UIUtils extends Logging { - val TABLE_CLASS_NOT_STRIPED = "table table-bordered table-condensed sortable" + val TABLE_CLASS_NOT_STRIPED = "table table-bordered table-condensed" val TABLE_CLASS_STRIPED = TABLE_CLASS_NOT_STRIPED + " table-striped" // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. @@ -267,9 +267,17 @@ private[spark] object UIUtils extends Logging { fixedWidth: Boolean = false, id: Option[String] = None, headerClasses: Seq[String] = Seq.empty, - stripeRowsWithCss: Boolean = true): Seq[Node] = { + stripeRowsWithCss: Boolean = true, + sortable: Boolean = true): Seq[Node] = { - val listingTableClass = if (stripeRowsWithCss) TABLE_CLASS_STRIPED else TABLE_CLASS_NOT_STRIPED + val listingTableClass = { + val _tableClass = if (stripeRowsWithCss) TABLE_CLASS_STRIPED else TABLE_CLASS_NOT_STRIPED + if (sortable) { + _tableClass + " sortable" + } else { + _tableClass + } + } val colWidth = 100.toDouble / headers.size val colWidthAttr = if (fixedWidth) colWidth + "%" else "" diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index 07db783c572cf26417296d8454eee349cb99921d..04f584621e71e07c3dd254ca4dd9d6a8f7cb7d73 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.storage.RDDInfo +import org.apache.spark.storage._ import org.apache.spark.ui.{UIUtils, WebUIPage} import org.apache.spark.util.Utils @@ -30,13 +30,25 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { - val rdds = listener.rddInfoList - val content = UIUtils.listingTable(rddHeader, rddRow, rdds, id = Some("storage-by-rdd-table")) + val content = rddTable(listener.rddInfoList) ++ + receiverBlockTables(listener.allExecutorStreamBlockStatus.sortBy(_.executorId)) UIUtils.headerSparkPage("Storage", content, parent) } + private[storage] def rddTable(rdds: Seq[RDDInfo]): Seq[Node] = { + if (rdds.isEmpty) { + // Don't show the rdd table if there is no RDD persisted. + Nil + } else { + <div> + <h4>RDDs</h4> + {UIUtils.listingTable(rddHeader, rddRow, rdds, id = Some("storage-by-rdd-table"))} + </div> + } + } + /** Header fields for the RDD table */ - private def rddHeader = Seq( + private val rddHeader = Seq( "RDD Name", "Storage Level", "Cached Partitions", @@ -56,7 +68,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { </td> <td>{rdd.storageLevel.description} </td> - <td>{rdd.numCachedPartitions}</td> + <td>{rdd.numCachedPartitions.toString}</td> <td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td> <td sorttable_customkey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td> <td sorttable_customkey={rdd.externalBlockStoreSize.toString}>{Utils.bytesToString(rdd.externalBlockStoreSize)}</td> @@ -64,4 +76,130 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { </tr> // scalastyle:on } + + private[storage] def receiverBlockTables(statuses: Seq[ExecutorStreamBlockStatus]): Seq[Node] = { + if (statuses.map(_.numStreamBlocks).sum == 0) { + // Don't show the tables if there is no stream block + Nil + } else { + val blocks = statuses.flatMap(_.blocks).groupBy(_.blockId).toSeq.sortBy(_._1.toString) + + <div> + <h4>Receiver Blocks</h4> + {executorMetricsTable(statuses)} + {streamBlockTable(blocks)} + </div> + } + } + + private def executorMetricsTable(statuses: Seq[ExecutorStreamBlockStatus]): Seq[Node] = { + <div> + <h5>Aggregated Block Metrics by Executor</h5> + {UIUtils.listingTable(executorMetricsTableHeader, executorMetricsTableRow, statuses, + id = Some("storage-by-executor-stream-blocks"))} + </div> + } + + private val executorMetricsTableHeader = Seq( + "Executor ID", + "Address", + "Total Size in Memory", + "Total Size in ExternalBlockStore", + "Total Size on Disk", + "Stream Blocks") + + private def executorMetricsTableRow(status: ExecutorStreamBlockStatus): Seq[Node] = { + <tr> + <td> + {status.executorId} + </td> + <td> + {status.location} + </td> + <td sorttable_customkey={status.totalMemSize.toString}> + {Utils.bytesToString(status.totalMemSize)} + </td> + <td sorttable_customkey={status.totalExternalBlockStoreSize.toString}> + {Utils.bytesToString(status.totalExternalBlockStoreSize)} + </td> + <td sorttable_customkey={status.totalDiskSize.toString}> + {Utils.bytesToString(status.totalDiskSize)} + </td> + <td> + {status.numStreamBlocks.toString} + </td> + </tr> + } + + private def streamBlockTable(blocks: Seq[(BlockId, Seq[BlockUIData])]): Seq[Node] = { + if (blocks.isEmpty) { + Nil + } else { + <div> + <h5>Blocks</h5> + {UIUtils.listingTable( + streamBlockTableHeader, + streamBlockTableRow, + blocks, + id = Some("storage-by-block-table"), + sortable = false)} + </div> + } + } + + private val streamBlockTableHeader = Seq( + "Block ID", + "Replication Level", + "Location", + "Storage Level", + "Size") + + /** Render a stream block */ + private def streamBlockTableRow(block: (BlockId, Seq[BlockUIData])): Seq[Node] = { + val replications = block._2 + assert(replications.size > 0) // This must be true because it's the result of "groupBy" + if (replications.size == 1) { + streamBlockTableSubrow(block._1, replications.head, replications.size, true) + } else { + streamBlockTableSubrow(block._1, replications.head, replications.size, true) ++ + replications.tail.map(streamBlockTableSubrow(block._1, _, replications.size, false)).flatten + } + } + + private def streamBlockTableSubrow( + blockId: BlockId, block: BlockUIData, replication: Int, firstSubrow: Boolean): Seq[Node] = { + val (storageLevel, size) = streamBlockStorageLevelDescriptionAndSize(block) + + <tr> + { + if (firstSubrow) { + <td rowspan={replication.toString}> + {block.blockId.toString} + </td> + <td rowspan={replication.toString}> + {replication.toString} + </td> + } + } + <td>{block.location}</td> + <td>{storageLevel}</td> + <td>{Utils.bytesToString(size)}</td> + </tr> + } + + private[storage] def streamBlockStorageLevelDescriptionAndSize( + block: BlockUIData): (String, Long) = { + if (block.storageLevel.useDisk) { + ("Disk", block.diskSize) + } else if (block.storageLevel.useMemory && block.storageLevel.deserialized) { + ("Memory", block.memSize) + } else if (block.storageLevel.useMemory && !block.storageLevel.deserialized) { + ("Memory Serialized", block.memSize) + } else if (block.storageLevel.useOffHeap) { + ("External", block.externalBlockStoreSize) + } else { + throw new IllegalStateException(s"Invalid Storage Level: ${block.storageLevel}") + } + } + } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 0351749700962bd443503c1083c31724f78c96c0..22e2993b3b5bd48d67bd2e6431c7f60be6756b33 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -39,7 +39,8 @@ private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storag * This class is thread-safe (unlike JobProgressListener) */ @DeveloperApi -class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener { +class StorageListener(storageStatusListener: StorageStatusListener) extends BlockStatusListener { + private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList diff --git a/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..d7ffde1e7864e7a12d2901c6914cda754017cecd --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.apache.spark.SparkFunSuite +import org.apache.spark.scheduler._ + +class BlockStatusListenerSuite extends SparkFunSuite { + + test("basic functions") { + val blockManagerId = BlockManagerId("0", "localhost", 10000) + val listener = new BlockStatusListener() + + // Add a block manager and a new block status + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(0, blockManagerId, 0)) + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo( + blockManagerId, + StreamBlockId(0, 100), + StorageLevel.MEMORY_AND_DISK, + memSize = 100, + diskSize = 100, + externalBlockStoreSize = 0))) + // The new block status should be added to the listener + val expectedBlock = BlockUIData( + StreamBlockId(0, 100), + "localhost:10000", + StorageLevel.MEMORY_AND_DISK, + memSize = 100, + diskSize = 100, + externalBlockStoreSize = 0 + ) + val expectedExecutorStreamBlockStatus = Seq( + ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock)) + ) + assert(listener.allExecutorStreamBlockStatus === expectedExecutorStreamBlockStatus) + + // Add the second block manager + val blockManagerId2 = BlockManagerId("1", "localhost", 10001) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(0, blockManagerId2, 0)) + // Add a new replication of the same block id from the second manager + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo( + blockManagerId2, + StreamBlockId(0, 100), + StorageLevel.MEMORY_AND_DISK, + memSize = 100, + diskSize = 100, + externalBlockStoreSize = 0))) + val expectedBlock2 = BlockUIData( + StreamBlockId(0, 100), + "localhost:10001", + StorageLevel.MEMORY_AND_DISK, + memSize = 100, + diskSize = 100, + externalBlockStoreSize = 0 + ) + // Each block manager should contain one block + val expectedExecutorStreamBlockStatus2 = Set( + ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock)), + ExecutorStreamBlockStatus("1", "localhost:10001", Seq(expectedBlock2)) + ) + assert(listener.allExecutorStreamBlockStatus.toSet === expectedExecutorStreamBlockStatus2) + + // Remove a replication of the same block + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo( + blockManagerId2, + StreamBlockId(0, 100), + StorageLevel.NONE, // StorageLevel.NONE means removing it + memSize = 0, + diskSize = 0, + externalBlockStoreSize = 0))) + // Only the first block manager contains a block + val expectedExecutorStreamBlockStatus3 = Set( + ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock)), + ExecutorStreamBlockStatus("1", "localhost:10001", Seq.empty) + ) + assert(listener.allExecutorStreamBlockStatus.toSet === expectedExecutorStreamBlockStatus3) + + // Remove the second block manager at first but add a new block status + // from this removed block manager + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(0, blockManagerId2)) + listener.onBlockUpdated(SparkListenerBlockUpdated( + BlockUpdatedInfo( + blockManagerId2, + StreamBlockId(0, 100), + StorageLevel.MEMORY_AND_DISK, + memSize = 100, + diskSize = 100, + externalBlockStoreSize = 0))) + // The second block manager is removed so we should not see the new block + val expectedExecutorStreamBlockStatus4 = Seq( + ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock)) + ) + assert(listener.allExecutorStreamBlockStatus === expectedExecutorStreamBlockStatus4) + + // Remove the last block manager + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(0, blockManagerId)) + // No block manager now so we should dop all block managers + assert(listener.allExecutorStreamBlockStatus.isEmpty) + } + +} diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..3dab15a9d46912dce3751dbc3aada973f086dc68 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.storage + +import scala.xml.Utility + +import org.mockito.Mockito._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.storage._ + +class StoragePageSuite extends SparkFunSuite { + + val storageTab = mock(classOf[StorageTab]) + when(storageTab.basePath).thenReturn("http://localhost:4040") + val storagePage = new StoragePage(storageTab) + + test("rddTable") { + val rdd1 = new RDDInfo(1, + "rdd1", + 10, + StorageLevel.MEMORY_ONLY, + Seq.empty) + rdd1.memSize = 100 + rdd1.numCachedPartitions = 10 + + val rdd2 = new RDDInfo(2, + "rdd2", + 10, + StorageLevel.DISK_ONLY, + Seq.empty) + rdd2.diskSize = 200 + rdd2.numCachedPartitions = 5 + + val rdd3 = new RDDInfo(3, + "rdd3", + 10, + StorageLevel.MEMORY_AND_DISK_SER, + Seq.empty) + rdd3.memSize = 400 + rdd3.diskSize = 500 + rdd3.numCachedPartitions = 10 + + val xmlNodes = storagePage.rddTable(Seq(rdd1, rdd2, rdd3)) + + val headers = Seq( + "RDD Name", + "Storage Level", + "Cached Partitions", + "Fraction Cached", + "Size in Memory", + "Size in ExternalBlockStore", + "Size on Disk") + assert((xmlNodes \\ "th").map(_.text) === headers) + + assert((xmlNodes \\ "tr").size === 3) + assert(((xmlNodes \\ "tr")(0) \\ "td").map(_.text.trim) === + Seq("rdd1", "Memory Deserialized 1x Replicated", "10", "100%", "100.0 B", "0.0 B", "0.0 B")) + // Check the url + assert(((xmlNodes \\ "tr")(0) \\ "td" \ "a")(0).attribute("href").map(_.text) === + Some("http://localhost:4040/storage/rdd?id=1")) + + assert(((xmlNodes \\ "tr")(1) \\ "td").map(_.text.trim) === + Seq("rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", "0.0 B", "200.0 B")) + // Check the url + assert(((xmlNodes \\ "tr")(1) \\ "td" \ "a")(0).attribute("href").map(_.text) === + Some("http://localhost:4040/storage/rdd?id=2")) + + assert(((xmlNodes \\ "tr")(2) \\ "td").map(_.text.trim) === + Seq("rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", "400.0 B", "0.0 B", + "500.0 B")) + // Check the url + assert(((xmlNodes \\ "tr")(2) \\ "td" \ "a")(0).attribute("href").map(_.text) === + Some("http://localhost:4040/storage/rdd?id=3")) + } + + test("empty rddTable") { + assert(storagePage.rddTable(Seq.empty).isEmpty) + } + + test("streamBlockStorageLevelDescriptionAndSize") { + val memoryBlock = BlockUIData(StreamBlockId(0, 0), + "localhost:1111", + StorageLevel.MEMORY_ONLY, + memSize = 100, + diskSize = 0, + externalBlockStoreSize = 0) + assert(("Memory", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(memoryBlock)) + + val memorySerializedBlock = BlockUIData(StreamBlockId(0, 0), + "localhost:1111", + StorageLevel.MEMORY_ONLY_SER, + memSize = 100, + diskSize = 0, + externalBlockStoreSize = 0) + assert(("Memory Serialized", 100) === + storagePage.streamBlockStorageLevelDescriptionAndSize(memorySerializedBlock)) + + val diskBlock = BlockUIData(StreamBlockId(0, 0), + "localhost:1111", + StorageLevel.DISK_ONLY, + memSize = 0, + diskSize = 100, + externalBlockStoreSize = 0) + assert(("Disk", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(diskBlock)) + + val externalBlock = BlockUIData(StreamBlockId(0, 0), + "localhost:1111", + StorageLevel.OFF_HEAP, + memSize = 0, + diskSize = 0, + externalBlockStoreSize = 100) + assert(("External", 100) === + storagePage.streamBlockStorageLevelDescriptionAndSize(externalBlock)) + } + + test("receiverBlockTables") { + val blocksForExecutor0 = Seq( + BlockUIData(StreamBlockId(0, 0), + "localhost:10000", + StorageLevel.MEMORY_ONLY, + memSize = 100, + diskSize = 0, + externalBlockStoreSize = 0), + BlockUIData(StreamBlockId(1, 1), + "localhost:10000", + StorageLevel.DISK_ONLY, + memSize = 0, + diskSize = 100, + externalBlockStoreSize = 0) + ) + val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", blocksForExecutor0) + + val blocksForExecutor1 = Seq( + BlockUIData(StreamBlockId(0, 0), + "localhost:10001", + StorageLevel.MEMORY_ONLY, + memSize = 100, + diskSize = 0, + externalBlockStoreSize = 0), + BlockUIData(StreamBlockId(2, 2), + "localhost:10001", + StorageLevel.OFF_HEAP, + memSize = 0, + diskSize = 0, + externalBlockStoreSize = 200), + BlockUIData(StreamBlockId(1, 1), + "localhost:10001", + StorageLevel.MEMORY_ONLY_SER, + memSize = 100, + diskSize = 0, + externalBlockStoreSize = 0) + ) + val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", blocksForExecutor1) + val xmlNodes = storagePage.receiverBlockTables(Seq(executor0, executor1)) + + val executorTable = (xmlNodes \\ "table")(0) + val executorHeaders = Seq( + "Executor ID", + "Address", + "Total Size in Memory", + "Total Size in ExternalBlockStore", + "Total Size on Disk", + "Stream Blocks") + assert((executorTable \\ "th").map(_.text) === executorHeaders) + + assert((executorTable \\ "tr").size === 2) + assert(((executorTable \\ "tr")(0) \\ "td").map(_.text.trim) === + Seq("0", "localhost:10000", "100.0 B", "0.0 B", "100.0 B", "2")) + assert(((executorTable \\ "tr")(1) \\ "td").map(_.text.trim) === + Seq("1", "localhost:10001", "200.0 B", "200.0 B", "0.0 B", "3")) + + val blockTable = (xmlNodes \\ "table")(1) + val blockHeaders = Seq( + "Block ID", + "Replication Level", + "Location", + "Storage Level", + "Size") + assert((blockTable \\ "th").map(_.text) === blockHeaders) + + assert((blockTable \\ "tr").size === 5) + assert(((blockTable \\ "tr")(0) \\ "td").map(_.text.trim) === + Seq("input-0-0", "2", "localhost:10000", "Memory", "100.0 B")) + // Check "rowspan=2" for the first 2 columns + assert(((blockTable \\ "tr")(0) \\ "td")(0).attribute("rowspan").map(_.text) === Some("2")) + assert(((blockTable \\ "tr")(0) \\ "td")(1).attribute("rowspan").map(_.text) === Some("2")) + + assert(((blockTable \\ "tr")(1) \\ "td").map(_.text.trim) === + Seq("localhost:10001", "Memory", "100.0 B")) + + assert(((blockTable \\ "tr")(2) \\ "td").map(_.text.trim) === + Seq("input-1-1", "2", "localhost:10000", "Disk", "100.0 B")) + // Check "rowspan=2" for the first 2 columns + assert(((blockTable \\ "tr")(2) \\ "td")(0).attribute("rowspan").map(_.text) === Some("2")) + assert(((blockTable \\ "tr")(2) \\ "td")(1).attribute("rowspan").map(_.text) === Some("2")) + + assert(((blockTable \\ "tr")(3) \\ "td").map(_.text.trim) === + Seq("localhost:10001", "Memory Serialized", "100.0 B")) + + assert(((blockTable \\ "tr")(4) \\ "td").map(_.text.trim) === + Seq("input-2-2", "1", "localhost:10001", "External", "200.0 B")) + // Check "rowspan=1" for the first 2 columns + assert(((blockTable \\ "tr")(4) \\ "td")(0).attribute("rowspan").map(_.text) === Some("1")) + assert(((blockTable \\ "tr")(4) \\ "td")(1).attribute("rowspan").map(_.text) === Some("1")) + } + + test("empty receiverBlockTables") { + assert(storagePage.receiverBlockTables(Seq.empty).isEmpty) + + val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", Seq.empty) + val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", Seq.empty) + assert(storagePage.receiverBlockTables(Seq(executor0, executor1)).isEmpty) + } +}