Skip to content
Snippets Groups Projects
Commit 538ee755 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #581 from jerryshao/master

fix [SPARK-740] block manage UI throws exception when enabling Spark Streaming
parents 9abcbcc7 c047f0e3
No related branches found
No related tags found
No related merge requests found
......@@ -4,9 +4,9 @@ import spark.{Utils, SparkContext}
import BlockManagerMasterActor.BlockStatus
private[spark]
case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
blocks: Map[String, BlockStatus]) {
def memUsed(blockPrefix: String = "") = {
blocks.filterKeys(_.startsWith(blockPrefix)).values.map(_.memSize).
reduceOption(_+_).getOrElse(0l)
......@@ -22,35 +22,40 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
}
case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) {
numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long)
extends Ordered[RDDInfo] {
override def toString = {
import Utils.memoryBytesToString
"RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize))
}
override def compare(that: RDDInfo) = {
this.id - that.id
}
}
/* Helper methods for storage-related objects */
private[spark]
object StorageUtils {
/* Given the current storage status of the BlockManager, returns information for each RDD */
def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus],
/* Given the current storage status of the BlockManager, returns information for each RDD */
def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus],
sc: SparkContext) : Array[RDDInfo] = {
rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc)
rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc)
}
/* Given a list of BlockStatus objets, returns information for each RDD */
def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus],
/* Given a list of BlockStatus objets, returns information for each RDD */
def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus],
sc: SparkContext) : Array[RDDInfo] = {
// Group by rddId, ignore the partition name
val groupedRddBlocks = infos.groupBy { case(k, v) =>
val groupedRddBlocks = infos.filterKeys(_.startsWith("rdd_")).groupBy { case(k, v) =>
k.substring(0,k.lastIndexOf('_'))
}.mapValues(_.values.toArray)
// For each RDD, generate an RDDInfo object
groupedRddBlocks.map { case(rddKey, rddBlocks) =>
val rddInfos = groupedRddBlocks.map { case(rddKey, rddBlocks) =>
// Add up memory and disk sizes
val memSize = rddBlocks.map(_.memSize).reduce(_ + _)
......@@ -65,10 +70,14 @@ object StorageUtils {
RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.partitions.size, memSize, diskSize)
}.toArray
scala.util.Sorting.quickSort(rddInfos)
rddInfos
}
/* Removes all BlockStatus object that are not part of a block prefix */
def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus],
/* Removes all BlockStatus object that are not part of a block prefix */
def filterStorageStatusByPrefix(storageStatusList: Array[StorageStatus],
prefix: String) : Array[StorageStatus] = {
storageStatusList.map { status =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment