Skip to content
Snippets Groups Projects
Commit 88ee6163 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #422 from squito/blockmanager_info

RDDInfo available from SparkContext
parents cd4ca936 c6190067
No related branches found
No related tags found
No related merge requests found
......@@ -46,6 +46,7 @@ import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, C
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import storage.BlockManagerUI
import util.{MetadataCleaner, TimeStampedHashMap}
import storage.{StorageStatus, StorageUtils, RDDInfo}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
......@@ -468,12 +469,27 @@ class SparkContext(
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
*/
def getSlavesMemoryStatus: Map[String, (Long, Long)] = {
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
(blockManagerId.ip + ":" + blockManagerId.port, mem)
}
}
/**
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
def getRDDStorageInfo : Array[RDDInfo] = {
StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
}
/**
* Return information about blocks stored in all of the slaves
*/
def getExecutorStorageStatus : Array[StorageStatus] = {
env.blockManager.master.getStorageStatus
}
/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
......
......@@ -115,6 +115,10 @@ private[spark] class BlockManagerMaster(
askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
def getStorageStatus: Array[StorageStatus] = {
askDriverWithReply[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray
}
/** Stop the driver actor, called only on the Spark driver node */
def stop() {
if (driverActor != null) {
......
package spark.storage
import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
import cc.spray.Directives
import scala.collection.mutable.ArrayBuffer
import spark.{Logging, SparkContext}
import spark.util.AkkaUtils
import spark.Utils
......@@ -48,32 +45,26 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
path("") {
completeWith {
// Request the current storage status from the Master
val future = blockManagerMaster ? GetStorageStatus
future.map { status =>
// Calculate macro-level statistics
val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
.reduceOption(_+_).getOrElse(0L)
val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
spark.storage.html.index.
render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
}
val storageStatusList = sc.getExecutorStorageStatus
// Calculate macro-level statistics
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
.reduceOption(_+_).getOrElse(0L)
val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
spark.storage.html.index.
render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
}
} ~
path("rdd") {
parameter("id") { id =>
completeWith {
val future = blockManagerMaster ? GetStorageStatus
future.map { status =>
val prefix = "rdd_" + id.toString
val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray
val filteredStorageStatusList = StorageUtils.
filterStorageStatusByPrefix(storageStatusList, prefix)
val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
}
val prefix = "rdd_" + id.toString
val storageStatusList = sc.getExecutorStorageStatus
val filteredStorageStatusList = StorageUtils.
filterStorageStatusByPrefix(storageStatusList, prefix)
val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
}
}
} ~
......
package spark.storage
import spark.SparkContext
import spark.{Utils, SparkContext}
import BlockManagerMasterActor.BlockStatus
private[spark]
......@@ -22,8 +22,13 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
}
case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
numPartitions: Int, memSize: Long, diskSize: Long)
numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) {
override def toString = {
import Utils.memoryBytesToString
"RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize))
}
}
/* Helper methods for storage-related objects */
private[spark]
......@@ -38,8 +43,6 @@ object StorageUtils {
/* Given a list of BlockStatus objets, returns information for each RDD */
def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus],
sc: SparkContext) : Array[RDDInfo] = {
// Find all RDD Blocks (ignore broadcast variables)
val rddBlocks = infos.filterKeys(_.startsWith("rdd"))
// Group by rddId, ignore the partition name
val groupedRddBlocks = infos.groupBy { case(k, v) =>
......@@ -56,10 +59,11 @@ object StorageUtils {
// Find the id of the RDD, e.g. rdd_1 => 1
val rddId = rddKey.split("_").last.toInt
// Get the friendly name for the rdd, if available.
val rddName = Option(sc.persistentRdds(rddId).name).getOrElse(rddKey)
val rddStorageLevel = sc.persistentRdds(rddId).getStorageLevel
RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize)
val rdd = sc.persistentRdds(rddId)
val rddName = Option(rdd.name).getOrElse(rddKey)
val rddStorageLevel = rdd.getStorageLevel
RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.splits.size, memSize, diskSize)
}.toArray
}
......@@ -75,4 +79,4 @@ object StorageUtils {
}
}
\ No newline at end of file
}
......@@ -11,7 +11,11 @@
<strong>Storage Level:</strong>
@(rddInfo.storageLevel.description)
<li>
<strong>Partitions:</strong>
<strong>Cached Partitions:</strong>
@(rddInfo.numCachedPartitions)
</li>
<li>
<strong>Total Partitions:</strong>
@(rddInfo.numPartitions)
</li>
<li>
......
......@@ -6,7 +6,8 @@
<tr>
<th>RDD Name</th>
<th>Storage Level</th>
<th>Partitions</th>
<th>Cached Partitions</th>
<th>Fraction Partitions Cached</th>
<th>Size in Memory</th>
<th>Size on Disk</th>
</tr>
......@@ -21,7 +22,8 @@
</td>
<td>@(rdd.storageLevel.description)
</td>
<td>@rdd.numPartitions</td>
<td>@rdd.numCachedPartitions</td>
<td>@(rdd.numCachedPartitions / rdd.numPartitions.toDouble)</td>
<td>@{Utils.memoryBytesToString(rdd.memSize)}</td>
<td>@{Utils.memoryBytesToString(rdd.diskSize)}</td>
</tr>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment