diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index 83dbea40b63f32a264a722fb0a53e8c900f7c88a..4337c42087e79986377d189e73afdc4ada2029f9 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -284,7 +284,7 @@ function renderDot(dot, container, forJob) { renderer(container, g); // Find the stage cluster and mark it for styling and post-processing - container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true); + container.selectAll("g.cluster[name^=\"Stage \"]").classed("stage", true); } /* -------------------- * diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 06da74f1b6b5f1539c204a14d875c45a7fc65007..003c218aada9c6b429271239f5a47a986cdf8b2b 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -130,7 +130,11 @@ private[ui] object RDDOperationGraph extends Logging { } } // Attach the outermost cluster to the root cluster, and the RDD to the innermost cluster - rddClusters.headOption.foreach { cluster => rootCluster.attachChildCluster(cluster) } + rddClusters.headOption.foreach { cluster => + if (!rootCluster.childClusters.contains(cluster)) { + rootCluster.attachChildCluster(cluster) + } + } rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) } } } diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css index ddd3a91dd8ef80dc1553fadec1a8bb39d1a69111..303f8ebb8814c4f92817736eaeda8f16e494db14 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css @@ -20,6 +20,12 @@ text-shadow: none; } +#plan-viz-graph svg g.cluster rect { + fill: #A0DFFF; + stroke: #3EC0FF; + stroke-width: 1px; +} + #plan-viz-graph svg g.node rect { fill: #C3EBFF; stroke: #3EC0FF; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 4f750ad13ab844896857df611fd50f4a5a811b0a..4dd9928244197eafdf2a9d5b332dbc7131243ed6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -36,12 +36,17 @@ class SparkPlanInfo( private[sql] object SparkPlanInfo { def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { + val children = plan match { + case WholeStageCodegen(child, _) => child :: Nil + case InputAdapter(child) => child :: Nil + case plan => plan.children + } val metrics = plan.metrics.toSeq.map { case (key, metric) => new SQLMetricInfo(metric.name.getOrElse(key), metric.id, Utils.getFormattedClassName(metric.param)) } - val children = plan.children.map(fromSparkPlan) - new SparkPlanInfo(plan.nodeName, plan.simpleString, children, plan.metadata, metrics) + new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), + plan.metadata, metrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index c74ad40406992a9b7967ced6b5f70ee2ccaaeb09..49915adf6cd29c8a264134c0fe58803f8781e6b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -99,7 +99,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") } private def planVisualization(metrics: Map[Long, String], graph: SparkPlanGraph): Seq[Node] = { - val metadata = graph.nodes.flatMap { node => + val metadata = graph.allNodes.flatMap { node => val nodeId = s"plan-meta-data-${node.id}" <div id={nodeId}>{node.desc}</div> } @@ -110,7 +110,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") <div class="dot-file"> {graph.makeDotFile(metrics)} </div> - <div id="plan-viz-metadata-size">{graph.nodes.size.toString}</div> + <div id="plan-viz-metadata-size">{graph.allNodes.size.toString}</div> {metadata} </div> {planVisualizationResources} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index cd56136927088108c367c709f35f9e0e93773a14..83c64f755f90fbb4ce39599a6f94e97cd2673dbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -219,7 +219,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi case SparkListenerSQLExecutionStart(executionId, description, details, physicalPlanDescription, sparkPlanInfo, time) => val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo) - val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node => + val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node => node.metrics.map(metric => metric.accumulatorId -> metric) } val executionUIData = new SQLExecutionUIData( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 3a6eff93998250c31d72d5068600e2c97e296be9..4eb248569b2810d8ee2f1893d76361b5b5643ef9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable -import org.apache.spark.sql.execution.SparkPlanInfo +import org.apache.spark.sql.execution.{InputAdapter, SparkPlanInfo, WholeStageCodegen} import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -41,6 +41,16 @@ private[ui] case class SparkPlanGraph( dotFile.append("}") dotFile.toString() } + + /** + * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen. + */ + val allNodes: Seq[SparkPlanGraphNode] = { + nodes.flatMap { + case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster + case node => Seq(node) + } + } } private[sql] object SparkPlanGraph { @@ -52,7 +62,7 @@ private[sql] object SparkPlanGraph { val nodeIdGenerator = new AtomicLong(0) val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]() val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]() - buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges) + buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, null, null) new SparkPlanGraph(nodes, edges) } @@ -60,22 +70,40 @@ private[sql] object SparkPlanGraph { planInfo: SparkPlanInfo, nodeIdGenerator: AtomicLong, nodes: mutable.ArrayBuffer[SparkPlanGraphNode], - edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = { - val metrics = planInfo.metrics.map { metric => - SQLPlanMetric(metric.name, metric.accumulatorId, - SQLMetrics.getMetricParam(metric.metricParam)) + edges: mutable.ArrayBuffer[SparkPlanGraphEdge], + parent: SparkPlanGraphNode, + subgraph: SparkPlanGraphCluster): Unit = { + if (planInfo.nodeName == classOf[WholeStageCodegen].getSimpleName) { + val cluster = new SparkPlanGraphCluster( + nodeIdGenerator.getAndIncrement(), + planInfo.nodeName, + planInfo.simpleString, + mutable.ArrayBuffer[SparkPlanGraphNode]()) + nodes += cluster + buildSparkPlanGraphNode( + planInfo.children.head, nodeIdGenerator, nodes, edges, parent, cluster) + } else if (planInfo.nodeName == classOf[InputAdapter].getSimpleName) { + buildSparkPlanGraphNode(planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null) + } else { + val metrics = planInfo.metrics.map { metric => + SQLPlanMetric(metric.name, metric.accumulatorId, + SQLMetrics.getMetricParam(metric.metricParam)) + } + val node = new SparkPlanGraphNode( + nodeIdGenerator.getAndIncrement(), planInfo.nodeName, + planInfo.simpleString, planInfo.metadata, metrics) + if (subgraph == null) { + nodes += node + } else { + subgraph.nodes += node + } + + if (parent != null) { + edges += SparkPlanGraphEdge(node.id, parent.id) + } + planInfo.children.foreach( + buildSparkPlanGraphNode(_, nodeIdGenerator, nodes, edges, node, subgraph)) } - val node = SparkPlanGraphNode( - nodeIdGenerator.getAndIncrement(), planInfo.nodeName, - planInfo.simpleString, planInfo.metadata, metrics) - - nodes += node - val childrenNodes = planInfo.children.map( - child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges)) - for (child <- childrenNodes) { - edges += SparkPlanGraphEdge(child.id, node.id) - } - node } } @@ -86,12 +114,12 @@ private[sql] object SparkPlanGraph { * @param name the name of this SparkPlan node * @param metrics metrics that this SparkPlan node will track */ -private[ui] case class SparkPlanGraphNode( - id: Long, - name: String, - desc: String, - metadata: Map[String, String], - metrics: Seq[SQLPlanMetric]) { +private[ui] class SparkPlanGraphNode( + val id: Long, + val name: String, + val desc: String, + val metadata: Map[String, String], + val metrics: Seq[SQLPlanMetric]) { def makeDotNode(metricsValue: Map[Long, String]): String = { val builder = new mutable.StringBuilder(name) @@ -117,6 +145,27 @@ private[ui] case class SparkPlanGraphNode( } } +/** + * Represent a tree of SparkPlan for WholeStageCodegen. + */ +private[ui] class SparkPlanGraphCluster( + id: Long, + name: String, + desc: String, + val nodes: mutable.ArrayBuffer[SparkPlanGraphNode]) + extends SparkPlanGraphNode(id, name, desc, Map.empty, Nil) { + + override def makeDotNode(metricsValue: Map[Long, String]): String = { + s""" + | subgraph cluster${id} { + | label=${name}; + | ${nodes.map(_.makeDotNode(metricsValue)).mkString(" \n")} + | } + """.stripMargin + } +} + + /** * Represent an edge in the SparkPlan tree. `fromId` is the parent node id, and `toId` is the child * node id. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 51285431a47ed03fa8354f0674593381a8d95080..cbae19ebd269d15cc8ae698a404a69d6ea5ac0ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -86,7 +86,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { // If we can track all jobs, check the metric values val metricValues = sqlContext.listener.getExecutionMetrics(executionId) val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan( - df.queryExecution.executedPlan)).nodes.filter { node => + df.queryExecution.executedPlan)).allNodes.filter { node => expectedMetrics.contains(node.id) }.map { node => val nodeMetrics = node.metrics.map { metric => @@ -134,6 +134,14 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { ) } + test("WholeStageCodegen metrics") { + // Assume the execution plan is + // WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1)) + // TODO: update metrics in generated operators + val df = sqlContext.range(10).filter('id < 5) + testSparkPlanMetrics(df, 1, Map.empty) + } + test("TungstenAggregate metrics") { // Assume the execution plan is // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index eef3c1f3e34d9c34ba6c35c6668ab2aff5aaa340..81a159d542c67c10ab62eceecf4cf680af7661b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -83,7 +83,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { val df = createTestDataFrame val accumulatorIds = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)) - .nodes.flatMap(_.metrics.map(_.accumulatorId)) + .allNodes.flatMap(_.metrics.map(_.accumulatorId)) // Assume all accumulators are long var accumulatorValue = 0L val accumulatorUpdates = accumulatorIds.map { id =>