Skip to content
Snippets Groups Projects
Commit fb1d06fc authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-4072] [CORE] Display Streaming blocks in Streaming UI

Replace #6634

This PR adds `SparkListenerBlockUpdated` to SparkListener so that it can monitor all block update infos that are sent to `BlockManagerMasaterEndpoint`, and also add new tables in the Storage tab to display the stream block infos.

![screen shot 2015-07-01 at 5 19 46 pm](https://cloud.githubusercontent.com/assets/1000778/8451562/c291a6ec-2016-11e5-890d-0afc174e1f8c.png)

Author: zsxwing <zsxwing@gmail.com>

Closes #6672 from zsxwing/SPARK-4072-2 and squashes the following commits:

df2c1d8 [zsxwing] Use xml query to check the xml elements
54d54af [zsxwing] Add unit tests for StoragePage
e29fb53 [zsxwing] Update as per TD's comments
ccbee07 [zsxwing] Fix the code style
6dc42b4 [zsxwing] Fix the replication level of blocks
450fad1 [zsxwing] Merge branch 'master' into SPARK-4072-2
1e9ef52 [zsxwing] Don't categorize by Executor ID
ca0ab69 [zsxwing] Fix the code style
3de2762 [zsxwing] Make object BlockUpdatedInfo private
e95b594 [zsxwing] Add 'Aggregated Stream Block Metrics by Executor' table
ba5d0d1 [zsxwing] Refactor the unit test to improve the readability
4bbe341 [zsxwing] Revert JsonProtocol and don't log SparkListenerBlockUpdated
b464dd1 [zsxwing] Add onBlockUpdated to EventLoggingListener
5ba014c [zsxwing] Fix the code style
0b1e47b [zsxwing] Add a developer api BlockUpdatedInfo
04838a9 [zsxwing] Fix the code style
2baa161 [zsxwing] Add unit tests
80f6c6d [zsxwing] Address comments
797ee4b [zsxwing] Display Streaming blocks in Streaming UI
parent 0a4071ea
No related branches found
No related tags found
No related merge requests found
Showing
with 684 additions and 28 deletions
......@@ -17,23 +17,7 @@
package org.apache.spark;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
import org.apache.spark.scheduler.*;
/**
* Java clients should extend this class instead of implementing
......@@ -94,4 +78,8 @@ public class JavaSparkListener implements SparkListener {
@Override
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
}
......@@ -112,4 +112,10 @@ public class SparkFirehoseListener implements SparkListener {
public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
onEvent(executorRemoved);
}
@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
onEvent(blockUpdated);
}
}
......@@ -199,6 +199,9 @@ private[spark] class EventLoggingListener(
logEvent(event, flushLogger = true)
}
// No-op because logging every update would be overkill
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
......
......@@ -26,7 +26,7 @@ import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.util.{Distribution, Utils}
@DeveloperApi
......@@ -98,6 +98,9 @@ case class SparkListenerExecutorAdded(time: Long, executorId: String, executorIn
case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent
/**
* Periodic updates from executors.
* @param execId executor id
......@@ -215,6 +218,11 @@ trait SparkListener {
* Called when the driver removes an executor.
*/
def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }
/**
* Called when the driver receives a block update info.
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
}
/**
......
......@@ -58,6 +58,8 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
}
}
......
......@@ -60,10 +60,11 @@ class BlockManagerMasterEndpoint(
register(blockManagerId, maxMemSize, slaveEndpoint)
context.reply(true)
case UpdateBlockInfo(
case _updateBlockInfo @ UpdateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
context.reply(updateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.storage
import scala.collection.mutable
import org.apache.spark.scheduler._
private[spark] case class BlockUIData(
blockId: BlockId,
location: String,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
externalBlockStoreSize: Long)
/**
* The aggregated status of stream blocks in an executor
*/
private[spark] case class ExecutorStreamBlockStatus(
executorId: String,
location: String,
blocks: Seq[BlockUIData]) {
def totalMemSize: Long = blocks.map(_.memSize).sum
def totalDiskSize: Long = blocks.map(_.diskSize).sum
def totalExternalBlockStoreSize: Long = blocks.map(_.externalBlockStoreSize).sum
def numStreamBlocks: Int = blocks.size
}
private[spark] class BlockStatusListener extends SparkListener {
private val blockManagers =
new mutable.HashMap[BlockManagerId, mutable.HashMap[BlockId, BlockUIData]]
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
val blockId = blockUpdated.blockUpdatedInfo.blockId
if (!blockId.isInstanceOf[StreamBlockId]) {
// Now we only monitor StreamBlocks
return
}
val blockManagerId = blockUpdated.blockUpdatedInfo.blockManagerId
val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
val memSize = blockUpdated.blockUpdatedInfo.memSize
val diskSize = blockUpdated.blockUpdatedInfo.diskSize
val externalBlockStoreSize = blockUpdated.blockUpdatedInfo.externalBlockStoreSize
synchronized {
// Drop the update info if the block manager is not registered
blockManagers.get(blockManagerId).foreach { blocksInBlockManager =>
if (storageLevel.isValid) {
blocksInBlockManager.put(blockId,
BlockUIData(
blockId,
blockManagerId.hostPort,
storageLevel,
memSize,
diskSize,
externalBlockStoreSize)
)
} else {
// If isValid is not true, it means we should drop the block.
blocksInBlockManager -= blockId
}
}
}
}
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
synchronized {
blockManagers.put(blockManagerAdded.blockManagerId, mutable.HashMap())
}
}
override def onBlockManagerRemoved(
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = synchronized {
blockManagers -= blockManagerRemoved.blockManagerId
}
def allExecutorStreamBlockStatus: Seq[ExecutorStreamBlockStatus] = synchronized {
blockManagers.map { case (blockManagerId, blocks) =>
ExecutorStreamBlockStatus(
blockManagerId.executorId, blockManagerId.hostPort, blocks.values.toSeq)
}.toSeq
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.storage
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.BlockManagerMessages.UpdateBlockInfo
/**
* :: DeveloperApi ::
* Stores information about a block status in a block manager.
*/
@DeveloperApi
case class BlockUpdatedInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
externalBlockStoreSize: Long)
private[spark] object BlockUpdatedInfo {
private[spark] def apply(updateBlockInfo: UpdateBlockInfo): BlockUpdatedInfo = {
BlockUpdatedInfo(
updateBlockInfo.blockManagerId,
updateBlockInfo.blockId,
updateBlockInfo.storageLevel,
updateBlockInfo.memSize,
updateBlockInfo.diskSize,
updateBlockInfo.externalBlockStoreSize)
}
}
......@@ -27,7 +27,7 @@ import org.apache.spark.ui.scope.RDDOperationGraph
/** Utility functions for generating XML pages with spark content. */
private[spark] object UIUtils extends Logging {
val TABLE_CLASS_NOT_STRIPED = "table table-bordered table-condensed sortable"
val TABLE_CLASS_NOT_STRIPED = "table table-bordered table-condensed"
val TABLE_CLASS_STRIPED = TABLE_CLASS_NOT_STRIPED + " table-striped"
// SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
......@@ -267,9 +267,17 @@ private[spark] object UIUtils extends Logging {
fixedWidth: Boolean = false,
id: Option[String] = None,
headerClasses: Seq[String] = Seq.empty,
stripeRowsWithCss: Boolean = true): Seq[Node] = {
stripeRowsWithCss: Boolean = true,
sortable: Boolean = true): Seq[Node] = {
val listingTableClass = if (stripeRowsWithCss) TABLE_CLASS_STRIPED else TABLE_CLASS_NOT_STRIPED
val listingTableClass = {
val _tableClass = if (stripeRowsWithCss) TABLE_CLASS_STRIPED else TABLE_CLASS_NOT_STRIPED
if (sortable) {
_tableClass + " sortable"
} else {
_tableClass
}
}
val colWidth = 100.toDouble / headers.size
val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
......
......@@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage._
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils
......@@ -30,13 +30,25 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
val rdds = listener.rddInfoList
val content = UIUtils.listingTable(rddHeader, rddRow, rdds, id = Some("storage-by-rdd-table"))
val content = rddTable(listener.rddInfoList) ++
receiverBlockTables(listener.allExecutorStreamBlockStatus.sortBy(_.executorId))
UIUtils.headerSparkPage("Storage", content, parent)
}
private[storage] def rddTable(rdds: Seq[RDDInfo]): Seq[Node] = {
if (rdds.isEmpty) {
// Don't show the rdd table if there is no RDD persisted.
Nil
} else {
<div>
<h4>RDDs</h4>
{UIUtils.listingTable(rddHeader, rddRow, rdds, id = Some("storage-by-rdd-table"))}
</div>
}
}
/** Header fields for the RDD table */
private def rddHeader = Seq(
private val rddHeader = Seq(
"RDD Name",
"Storage Level",
"Cached Partitions",
......@@ -56,7 +68,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
</td>
<td>{rdd.storageLevel.description}
</td>
<td>{rdd.numCachedPartitions}</td>
<td>{rdd.numCachedPartitions.toString}</td>
<td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
<td sorttable_customkey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td>
<td sorttable_customkey={rdd.externalBlockStoreSize.toString}>{Utils.bytesToString(rdd.externalBlockStoreSize)}</td>
......@@ -64,4 +76,130 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
</tr>
// scalastyle:on
}
private[storage] def receiverBlockTables(statuses: Seq[ExecutorStreamBlockStatus]): Seq[Node] = {
if (statuses.map(_.numStreamBlocks).sum == 0) {
// Don't show the tables if there is no stream block
Nil
} else {
val blocks = statuses.flatMap(_.blocks).groupBy(_.blockId).toSeq.sortBy(_._1.toString)
<div>
<h4>Receiver Blocks</h4>
{executorMetricsTable(statuses)}
{streamBlockTable(blocks)}
</div>
}
}
private def executorMetricsTable(statuses: Seq[ExecutorStreamBlockStatus]): Seq[Node] = {
<div>
<h5>Aggregated Block Metrics by Executor</h5>
{UIUtils.listingTable(executorMetricsTableHeader, executorMetricsTableRow, statuses,
id = Some("storage-by-executor-stream-blocks"))}
</div>
}
private val executorMetricsTableHeader = Seq(
"Executor ID",
"Address",
"Total Size in Memory",
"Total Size in ExternalBlockStore",
"Total Size on Disk",
"Stream Blocks")
private def executorMetricsTableRow(status: ExecutorStreamBlockStatus): Seq[Node] = {
<tr>
<td>
{status.executorId}
</td>
<td>
{status.location}
</td>
<td sorttable_customkey={status.totalMemSize.toString}>
{Utils.bytesToString(status.totalMemSize)}
</td>
<td sorttable_customkey={status.totalExternalBlockStoreSize.toString}>
{Utils.bytesToString(status.totalExternalBlockStoreSize)}
</td>
<td sorttable_customkey={status.totalDiskSize.toString}>
{Utils.bytesToString(status.totalDiskSize)}
</td>
<td>
{status.numStreamBlocks.toString}
</td>
</tr>
}
private def streamBlockTable(blocks: Seq[(BlockId, Seq[BlockUIData])]): Seq[Node] = {
if (blocks.isEmpty) {
Nil
} else {
<div>
<h5>Blocks</h5>
{UIUtils.listingTable(
streamBlockTableHeader,
streamBlockTableRow,
blocks,
id = Some("storage-by-block-table"),
sortable = false)}
</div>
}
}
private val streamBlockTableHeader = Seq(
"Block ID",
"Replication Level",
"Location",
"Storage Level",
"Size")
/** Render a stream block */
private def streamBlockTableRow(block: (BlockId, Seq[BlockUIData])): Seq[Node] = {
val replications = block._2
assert(replications.size > 0) // This must be true because it's the result of "groupBy"
if (replications.size == 1) {
streamBlockTableSubrow(block._1, replications.head, replications.size, true)
} else {
streamBlockTableSubrow(block._1, replications.head, replications.size, true) ++
replications.tail.map(streamBlockTableSubrow(block._1, _, replications.size, false)).flatten
}
}
private def streamBlockTableSubrow(
blockId: BlockId, block: BlockUIData, replication: Int, firstSubrow: Boolean): Seq[Node] = {
val (storageLevel, size) = streamBlockStorageLevelDescriptionAndSize(block)
<tr>
{
if (firstSubrow) {
<td rowspan={replication.toString}>
{block.blockId.toString}
</td>
<td rowspan={replication.toString}>
{replication.toString}
</td>
}
}
<td>{block.location}</td>
<td>{storageLevel}</td>
<td>{Utils.bytesToString(size)}</td>
</tr>
}
private[storage] def streamBlockStorageLevelDescriptionAndSize(
block: BlockUIData): (String, Long) = {
if (block.storageLevel.useDisk) {
("Disk", block.diskSize)
} else if (block.storageLevel.useMemory && block.storageLevel.deserialized) {
("Memory", block.memSize)
} else if (block.storageLevel.useMemory && !block.storageLevel.deserialized) {
("Memory Serialized", block.memSize)
} else if (block.storageLevel.useOffHeap) {
("External", block.externalBlockStoreSize)
} else {
throw new IllegalStateException(s"Invalid Storage Level: ${block.storageLevel}")
}
}
}
......@@ -39,7 +39,8 @@ private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storag
* This class is thread-safe (unlike JobProgressListener)
*/
@DeveloperApi
class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
class StorageListener(storageStatusListener: StorageStatusListener) extends BlockStatusListener {
private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing
def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.storage
import org.apache.spark.SparkFunSuite
import org.apache.spark.scheduler._
class BlockStatusListenerSuite extends SparkFunSuite {
test("basic functions") {
val blockManagerId = BlockManagerId("0", "localhost", 10000)
val listener = new BlockStatusListener()
// Add a block manager and a new block status
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(0, blockManagerId, 0))
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(
blockManagerId,
StreamBlockId(0, 100),
StorageLevel.MEMORY_AND_DISK,
memSize = 100,
diskSize = 100,
externalBlockStoreSize = 0)))
// The new block status should be added to the listener
val expectedBlock = BlockUIData(
StreamBlockId(0, 100),
"localhost:10000",
StorageLevel.MEMORY_AND_DISK,
memSize = 100,
diskSize = 100,
externalBlockStoreSize = 0
)
val expectedExecutorStreamBlockStatus = Seq(
ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock))
)
assert(listener.allExecutorStreamBlockStatus === expectedExecutorStreamBlockStatus)
// Add the second block manager
val blockManagerId2 = BlockManagerId("1", "localhost", 10001)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(0, blockManagerId2, 0))
// Add a new replication of the same block id from the second manager
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(
blockManagerId2,
StreamBlockId(0, 100),
StorageLevel.MEMORY_AND_DISK,
memSize = 100,
diskSize = 100,
externalBlockStoreSize = 0)))
val expectedBlock2 = BlockUIData(
StreamBlockId(0, 100),
"localhost:10001",
StorageLevel.MEMORY_AND_DISK,
memSize = 100,
diskSize = 100,
externalBlockStoreSize = 0
)
// Each block manager should contain one block
val expectedExecutorStreamBlockStatus2 = Set(
ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock)),
ExecutorStreamBlockStatus("1", "localhost:10001", Seq(expectedBlock2))
)
assert(listener.allExecutorStreamBlockStatus.toSet === expectedExecutorStreamBlockStatus2)
// Remove a replication of the same block
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(
blockManagerId2,
StreamBlockId(0, 100),
StorageLevel.NONE, // StorageLevel.NONE means removing it
memSize = 0,
diskSize = 0,
externalBlockStoreSize = 0)))
// Only the first block manager contains a block
val expectedExecutorStreamBlockStatus3 = Set(
ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock)),
ExecutorStreamBlockStatus("1", "localhost:10001", Seq.empty)
)
assert(listener.allExecutorStreamBlockStatus.toSet === expectedExecutorStreamBlockStatus3)
// Remove the second block manager at first but add a new block status
// from this removed block manager
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(0, blockManagerId2))
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(
blockManagerId2,
StreamBlockId(0, 100),
StorageLevel.MEMORY_AND_DISK,
memSize = 100,
diskSize = 100,
externalBlockStoreSize = 0)))
// The second block manager is removed so we should not see the new block
val expectedExecutorStreamBlockStatus4 = Seq(
ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock))
)
assert(listener.allExecutorStreamBlockStatus === expectedExecutorStreamBlockStatus4)
// Remove the last block manager
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(0, blockManagerId))
// No block manager now so we should dop all block managers
assert(listener.allExecutorStreamBlockStatus.isEmpty)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui.storage
import scala.xml.Utility
import org.mockito.Mockito._
import org.apache.spark.SparkFunSuite
import org.apache.spark.storage._
class StoragePageSuite extends SparkFunSuite {
val storageTab = mock(classOf[StorageTab])
when(storageTab.basePath).thenReturn("http://localhost:4040")
val storagePage = new StoragePage(storageTab)
test("rddTable") {
val rdd1 = new RDDInfo(1,
"rdd1",
10,
StorageLevel.MEMORY_ONLY,
Seq.empty)
rdd1.memSize = 100
rdd1.numCachedPartitions = 10
val rdd2 = new RDDInfo(2,
"rdd2",
10,
StorageLevel.DISK_ONLY,
Seq.empty)
rdd2.diskSize = 200
rdd2.numCachedPartitions = 5
val rdd3 = new RDDInfo(3,
"rdd3",
10,
StorageLevel.MEMORY_AND_DISK_SER,
Seq.empty)
rdd3.memSize = 400
rdd3.diskSize = 500
rdd3.numCachedPartitions = 10
val xmlNodes = storagePage.rddTable(Seq(rdd1, rdd2, rdd3))
val headers = Seq(
"RDD Name",
"Storage Level",
"Cached Partitions",
"Fraction Cached",
"Size in Memory",
"Size in ExternalBlockStore",
"Size on Disk")
assert((xmlNodes \\ "th").map(_.text) === headers)
assert((xmlNodes \\ "tr").size === 3)
assert(((xmlNodes \\ "tr")(0) \\ "td").map(_.text.trim) ===
Seq("rdd1", "Memory Deserialized 1x Replicated", "10", "100%", "100.0 B", "0.0 B", "0.0 B"))
// Check the url
assert(((xmlNodes \\ "tr")(0) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
Some("http://localhost:4040/storage/rdd?id=1"))
assert(((xmlNodes \\ "tr")(1) \\ "td").map(_.text.trim) ===
Seq("rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", "0.0 B", "200.0 B"))
// Check the url
assert(((xmlNodes \\ "tr")(1) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
Some("http://localhost:4040/storage/rdd?id=2"))
assert(((xmlNodes \\ "tr")(2) \\ "td").map(_.text.trim) ===
Seq("rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", "400.0 B", "0.0 B",
"500.0 B"))
// Check the url
assert(((xmlNodes \\ "tr")(2) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
Some("http://localhost:4040/storage/rdd?id=3"))
}
test("empty rddTable") {
assert(storagePage.rddTable(Seq.empty).isEmpty)
}
test("streamBlockStorageLevelDescriptionAndSize") {
val memoryBlock = BlockUIData(StreamBlockId(0, 0),
"localhost:1111",
StorageLevel.MEMORY_ONLY,
memSize = 100,
diskSize = 0,
externalBlockStoreSize = 0)
assert(("Memory", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(memoryBlock))
val memorySerializedBlock = BlockUIData(StreamBlockId(0, 0),
"localhost:1111",
StorageLevel.MEMORY_ONLY_SER,
memSize = 100,
diskSize = 0,
externalBlockStoreSize = 0)
assert(("Memory Serialized", 100) ===
storagePage.streamBlockStorageLevelDescriptionAndSize(memorySerializedBlock))
val diskBlock = BlockUIData(StreamBlockId(0, 0),
"localhost:1111",
StorageLevel.DISK_ONLY,
memSize = 0,
diskSize = 100,
externalBlockStoreSize = 0)
assert(("Disk", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(diskBlock))
val externalBlock = BlockUIData(StreamBlockId(0, 0),
"localhost:1111",
StorageLevel.OFF_HEAP,
memSize = 0,
diskSize = 0,
externalBlockStoreSize = 100)
assert(("External", 100) ===
storagePage.streamBlockStorageLevelDescriptionAndSize(externalBlock))
}
test("receiverBlockTables") {
val blocksForExecutor0 = Seq(
BlockUIData(StreamBlockId(0, 0),
"localhost:10000",
StorageLevel.MEMORY_ONLY,
memSize = 100,
diskSize = 0,
externalBlockStoreSize = 0),
BlockUIData(StreamBlockId(1, 1),
"localhost:10000",
StorageLevel.DISK_ONLY,
memSize = 0,
diskSize = 100,
externalBlockStoreSize = 0)
)
val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", blocksForExecutor0)
val blocksForExecutor1 = Seq(
BlockUIData(StreamBlockId(0, 0),
"localhost:10001",
StorageLevel.MEMORY_ONLY,
memSize = 100,
diskSize = 0,
externalBlockStoreSize = 0),
BlockUIData(StreamBlockId(2, 2),
"localhost:10001",
StorageLevel.OFF_HEAP,
memSize = 0,
diskSize = 0,
externalBlockStoreSize = 200),
BlockUIData(StreamBlockId(1, 1),
"localhost:10001",
StorageLevel.MEMORY_ONLY_SER,
memSize = 100,
diskSize = 0,
externalBlockStoreSize = 0)
)
val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", blocksForExecutor1)
val xmlNodes = storagePage.receiverBlockTables(Seq(executor0, executor1))
val executorTable = (xmlNodes \\ "table")(0)
val executorHeaders = Seq(
"Executor ID",
"Address",
"Total Size in Memory",
"Total Size in ExternalBlockStore",
"Total Size on Disk",
"Stream Blocks")
assert((executorTable \\ "th").map(_.text) === executorHeaders)
assert((executorTable \\ "tr").size === 2)
assert(((executorTable \\ "tr")(0) \\ "td").map(_.text.trim) ===
Seq("0", "localhost:10000", "100.0 B", "0.0 B", "100.0 B", "2"))
assert(((executorTable \\ "tr")(1) \\ "td").map(_.text.trim) ===
Seq("1", "localhost:10001", "200.0 B", "200.0 B", "0.0 B", "3"))
val blockTable = (xmlNodes \\ "table")(1)
val blockHeaders = Seq(
"Block ID",
"Replication Level",
"Location",
"Storage Level",
"Size")
assert((blockTable \\ "th").map(_.text) === blockHeaders)
assert((blockTable \\ "tr").size === 5)
assert(((blockTable \\ "tr")(0) \\ "td").map(_.text.trim) ===
Seq("input-0-0", "2", "localhost:10000", "Memory", "100.0 B"))
// Check "rowspan=2" for the first 2 columns
assert(((blockTable \\ "tr")(0) \\ "td")(0).attribute("rowspan").map(_.text) === Some("2"))
assert(((blockTable \\ "tr")(0) \\ "td")(1).attribute("rowspan").map(_.text) === Some("2"))
assert(((blockTable \\ "tr")(1) \\ "td").map(_.text.trim) ===
Seq("localhost:10001", "Memory", "100.0 B"))
assert(((blockTable \\ "tr")(2) \\ "td").map(_.text.trim) ===
Seq("input-1-1", "2", "localhost:10000", "Disk", "100.0 B"))
// Check "rowspan=2" for the first 2 columns
assert(((blockTable \\ "tr")(2) \\ "td")(0).attribute("rowspan").map(_.text) === Some("2"))
assert(((blockTable \\ "tr")(2) \\ "td")(1).attribute("rowspan").map(_.text) === Some("2"))
assert(((blockTable \\ "tr")(3) \\ "td").map(_.text.trim) ===
Seq("localhost:10001", "Memory Serialized", "100.0 B"))
assert(((blockTable \\ "tr")(4) \\ "td").map(_.text.trim) ===
Seq("input-2-2", "1", "localhost:10001", "External", "200.0 B"))
// Check "rowspan=1" for the first 2 columns
assert(((blockTable \\ "tr")(4) \\ "td")(0).attribute("rowspan").map(_.text) === Some("1"))
assert(((blockTable \\ "tr")(4) \\ "td")(1).attribute("rowspan").map(_.text) === Some("1"))
}
test("empty receiverBlockTables") {
assert(storagePage.receiverBlockTables(Seq.empty).isEmpty)
val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", Seq.empty)
val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", Seq.empty)
assert(storagePage.receiverBlockTables(Seq(executor0, executor1)).isEmpty)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment