diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css index 3b4ae2ed354b81314997ea0c9f1f80d6a4b89dfd..9cc5c79f67346b16bb9e4a8e4ee20c9c124dbd5f 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css @@ -122,3 +122,7 @@ stroke: #52C366; stroke-width: 2px; } + +.tooltip-inner { + white-space: pre-wrap; +} diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 96062626b50451ee6eac4b9e88658e2e57d24441..3fa209b924170da3403d07e998a46b8eb179f5d0 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{RDDOperationScope, RDD} -import org.apache.spark.util.Utils +import org.apache.spark.util.{CallSite, Utils} @DeveloperApi class RDDInfo( @@ -28,9 +28,20 @@ class RDDInfo( val numPartitions: Int, var storageLevel: StorageLevel, val parentIds: Seq[Int], + val callSite: CallSite, val scope: Option[RDDOperationScope] = None) extends Ordered[RDDInfo] { + def this( + id: Int, + name: String, + numPartitions: Int, + storageLevel: StorageLevel, + parentIds: Seq[Int], + scope: Option[RDDOperationScope] = None) { + this(id, name, numPartitions, storageLevel, parentIds, CallSite.empty, scope) + } + var numCachedPartitions = 0 var memSize = 0L var diskSize = 0L @@ -56,6 +67,7 @@ private[spark] object RDDInfo { def fromRdd(rdd: RDD[_]): RDDInfo = { val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd)) val parentIds = rdd.dependencies.map(_.rdd.id) - new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, parentIds, rdd.scope) + new RDDInfo(rdd.id, rddName, rdd.partitions.length, + rdd.getStorageLevel, parentIds, rdd.creationSite, rdd.scope) } } diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 81f168a447ead0e06ae754b5097e5734e9863b1d..24274562657b3f8387b315ebbeba4265783f8b7c 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.{StringBuilder, ListBuffer} import org.apache.spark.Logging import org.apache.spark.scheduler.StageInfo import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.CallSite /** * A representation of a generic cluster graph used for storing information on RDD operations. @@ -38,7 +39,7 @@ private[ui] case class RDDOperationGraph( rootCluster: RDDOperationCluster) /** A node in an RDDOperationGraph. This represents an RDD. */ -private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean) +private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean, callsite: CallSite) /** * A directed edge connecting two nodes in an RDDOperationGraph. @@ -104,8 +105,8 @@ private[ui] object RDDOperationGraph extends Logging { edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) } // TODO: differentiate between the intention to cache an RDD and whether it's actually cached - val node = nodes.getOrElseUpdate( - rdd.id, RDDOperationNode(rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE)) + val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode( + rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite)) if (rdd.scope.isEmpty) { // This RDD has no encompassing scope, so we put it directly in the root cluster @@ -177,7 +178,8 @@ private[ui] object RDDOperationGraph extends Logging { /** Return the dot representation of a node in an RDDOperationGraph. */ private def makeDotNode(node: RDDOperationNode): String = { - s"""${node.id} [label="${node.name} [${node.id}]"]""" + val label = s"${node.name} [${node.id}]\n${node.callsite.shortForm}" + s"""${node.id} [label="$label"]""" } /** Update the dot representation of the RDDOperationGraph in cluster to subgraph. */ diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index ee2eb58cf5e2a5836779ccf74a9d187131b47a21..c9beeb25e05af3b2709452dee4bdf5fc6ecca700 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -398,6 +398,7 @@ private[spark] object JsonProtocol { ("RDD ID" -> rddInfo.id) ~ ("Name" -> rddInfo.name) ~ ("Scope" -> rddInfo.scope.map(_.toJson)) ~ + ("Callsite" -> callsiteToJson(rddInfo.callSite)) ~ ("Parent IDs" -> parentIds) ~ ("Storage Level" -> storageLevel) ~ ("Number of Partitions" -> rddInfo.numPartitions) ~ @@ -407,6 +408,11 @@ private[spark] object JsonProtocol { ("Disk Size" -> rddInfo.diskSize) } + def callsiteToJson(callsite: CallSite): JValue = { + ("Short Form" -> callsite.shortForm) ~ + ("Long Form" -> callsite.longForm) + } + def storageLevelToJson(storageLevel: StorageLevel): JValue = { ("Use Disk" -> storageLevel.useDisk) ~ ("Use Memory" -> storageLevel.useMemory) ~ @@ -851,6 +857,9 @@ private[spark] object JsonProtocol { val scope = Utils.jsonOption(json \ "Scope") .map(_.extract[String]) .map(RDDOperationScope.fromJson) + val callsite = Utils.jsonOption(json \ "Callsite") + .map(callsiteFromJson) + .getOrElse(CallSite.empty) val parentIds = Utils.jsonOption(json \ "Parent IDs") .map { l => l.extract[List[JValue]].map(_.extract[Int]) } .getOrElse(Seq.empty) @@ -863,7 +872,7 @@ private[spark] object JsonProtocol { .getOrElse(json \ "Tachyon Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] - val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, scope) + val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, callsite, scope) rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize rddInfo.externalBlockStoreSize = externalBlockStoreSize @@ -871,6 +880,12 @@ private[spark] object JsonProtocol { rddInfo } + def callsiteFromJson(json: JValue): CallSite = { + val shortForm = (json \ "Short Form").extract[String] + val longForm = (json \ "Long Form").extract[String] + CallSite(shortForm, longForm) + } + def storageLevelFromJson(json: JValue): StorageLevel = { val useDisk = (json \ "Use Disk").extract[Boolean] val useMemory = (json \ "Use Memory").extract[Boolean] diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5a976ee839b1e0f0b416196c4eb1e18c471545d6..316c194ff3454552ec0f7474bed6e1742adb79b7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -57,6 +57,7 @@ private[spark] case class CallSite(shortForm: String, longForm: String) private[spark] object CallSite { val SHORT_FORM = "callSite.short" val LONG_FORM = "callSite.long" + val empty = CallSite("", "") } /** diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 18eec7da9763e7b025e2fabec2cc1e2f480b9806..ceecfd665bf8703061b1ae491b42b03f0de6864c 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -615,29 +615,29 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " + "label="Stage 0";\n subgraph ")) assert(stage0.contains("{\n label="parallelize";\n " + - "0 [label="ParallelCollectionRDD [0]"];\n }")) + "0 [label="ParallelCollectionRDD [0]")) assert(stage0.contains("{\n label="map";\n " + - "1 [label="MapPartitionsRDD [1]"];\n }")) + "1 [label="MapPartitionsRDD [1]")) assert(stage0.contains("{\n label="groupBy";\n " + - "2 [label="MapPartitionsRDD [2]"];\n }")) + "2 [label="MapPartitionsRDD [2]")) val stage1 = Source.fromURL(sc.ui.get.appUIAddress + "/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " + "label="Stage 1";\n subgraph ")) assert(stage1.contains("{\n label="groupBy";\n " + - "3 [label="ShuffledRDD [3]"];\n }")) + "3 [label="ShuffledRDD [3]")) assert(stage1.contains("{\n label="map";\n " + - "4 [label="MapPartitionsRDD [4]"];\n }")) + "4 [label="MapPartitionsRDD [4]")) assert(stage1.contains("{\n label="groupBy";\n " + - "5 [label="MapPartitionsRDD [5]"];\n }")) + "5 [label="MapPartitionsRDD [5]")) val stage2 = Source.fromURL(sc.ui.get.appUIAddress + "/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " + "label="Stage 2";\n subgraph ")) assert(stage2.contains("{\n label="groupBy";\n " + - "6 [label="ShuffledRDD [6]"];\n }")) + "6 [label="ShuffledRDD [6]")) } } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 953456c2caa895c61a8e9ad3674c69d208364c7c..3f94ef7041914b13cf8ecf711f668b5b7148d9ca 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -111,6 +111,7 @@ class JsonProtocolSuite extends SparkFunSuite { test("Dependent Classes") { val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) + testCallsite(CallSite("happy", "birthday")) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) testTaskMetrics(makeTaskMetrics( @@ -163,6 +164,10 @@ class JsonProtocolSuite extends SparkFunSuite { testBlockId(StreamBlockId(1, 2L)) } + /* ============================== * + | Backward compatibility tests | + * ============================== */ + test("ExceptionFailure backward compatibility") { val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null, None, None) @@ -334,14 +339,17 @@ class JsonProtocolSuite extends SparkFunSuite { assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent)) } - test("RDDInfo backward compatibility (scope, parent IDs)") { - // Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" properties - val rddInfo = new RDDInfo( - 1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), Some(new RDDOperationScope("fable"))) + test("RDDInfo backward compatibility (scope, parent IDs, callsite)") { + // "Scope" and "Parent IDs" were introduced in Spark 1.4.0 + // "Callsite" was introduced in Spark 1.6.0 + val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), + CallSite("short", "long"), Some(new RDDOperationScope("fable"))) val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo) .removeField({ _._1 == "Parent IDs"}) .removeField({ _._1 == "Scope"}) - val expectedRddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq.empty, scope = None) + .removeField({ _._1 == "Callsite"}) + val expectedRddInfo = new RDDInfo( + 1, "one", 100, StorageLevel.NONE, Seq.empty, CallSite.empty, scope = None) assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson)) } @@ -389,6 +397,11 @@ class JsonProtocolSuite extends SparkFunSuite { assertEquals(info, newInfo) } + private def testCallsite(callsite: CallSite): Unit = { + val newCallsite = JsonProtocol.callsiteFromJson(JsonProtocol.callsiteToJson(callsite)) + assert(callsite === newCallsite) + } + private def testStageInfo(info: StageInfo) { val newInfo = JsonProtocol.stageInfoFromJson(JsonProtocol.stageInfoToJson(info)) assertEquals(info, newInfo) @@ -713,7 +726,8 @@ class JsonProtocolSuite extends SparkFunSuite { } private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = { - val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, Seq(1, 4, 7)) + val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, + Seq(1, 4, 7), CallSite(a.toString, b.toString)) r.numCachedPartitions = c r.memSize = d r.diskSize = e @@ -856,6 +870,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 101, | "Name": "mayor", + | "Callsite": {"Short Form": "101", "Long Form": "201"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1258,6 +1273,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 1, | "Name": "mayor", + | "Callsite": {"Short Form": "1", "Long Form": "200"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1301,6 +1317,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 2, | "Name": "mayor", + | "Callsite": {"Short Form": "2", "Long Form": "400"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1318,6 +1335,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 3, | "Name": "mayor", + | "Callsite": {"Short Form": "3", "Long Form": "401"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1361,6 +1379,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 3, | "Name": "mayor", + | "Callsite": {"Short Form": "3", "Long Form": "600"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1378,6 +1397,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 4, | "Name": "mayor", + | "Callsite": {"Short Form": "4", "Long Form": "601"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1395,6 +1415,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 5, | "Name": "mayor", + | "Callsite": {"Short Form": "5", "Long Form": "602"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1438,6 +1459,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 4, | "Name": "mayor", + | "Callsite": {"Short Form": "4", "Long Form": "800"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1455,6 +1477,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 5, | "Name": "mayor", + | "Callsite": {"Short Form": "5", "Long Form": "801"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1472,6 +1495,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 6, | "Name": "mayor", + | "Callsite": {"Short Form": "6", "Long Form": "802"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1489,6 +1513,7 @@ class JsonProtocolSuite extends SparkFunSuite { | { | "RDD ID": 7, | "Name": "mayor", + | "Callsite": {"Short Form": "7", "Long Form": "803"}, | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true,