From c8fb776d4a0134c47f90272c4bd5e4bba902aae5 Mon Sep 17 00:00:00 2001
From: Sean Zhong <seanzhong@databricks.com>
Date: Wed, 1 Jun 2016 17:03:39 -0700
Subject: [PATCH] [SPARK-15692][SQL] Improves the explain output of several
 physical plans by displaying embedded logical plan in tree style

## What changes were proposed in this pull request?

Improves the explain output of several physical plans by displaying embedded logical plan in tree style

Some physical plan contains a embedded logical plan, for example, `cache tableName query` maps to:

```
case class CacheTableCommand(
    tableName: String,
    plan: Option[LogicalPlan],
    isLazy: Boolean)
  extends RunnableCommand
```

It is easier to read the explain output if we can display the `plan` in tree style.

**Before change:**

Everything is messed in one line.

```
scala> Seq((1,2)).toDF().createOrReplaceTempView("testView")
scala> spark.sql("cache table testView2 select * from testView").explain()
== Physical Plan ==
ExecutedCommand CacheTableCommand testView2, Some('Project [*]
+- 'UnresolvedRelation `testView`, None
), false
```

**After change:**

```
scala> spark.sql("cache table testView2 select * from testView").explain()
== Physical Plan ==
ExecutedCommand
:  +- CacheTableCommand testView2, false
:     :  +- 'Project [*]
:     :     +- 'UnresolvedRelation `testView`, None
```

## How was this patch tested?

Manual test.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13433 from clockfly/verbose_breakdown_3_2.
---
 .../apache/spark/sql/catalyst/plans/QueryPlan.scala    |  2 +-
 .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 10 +++++++---
 .../sql/execution/columnar/InMemoryTableScanExec.scala |  5 +++++
 .../org/apache/spark/sql/execution/command/cache.scala |  6 +++++-
 .../apache/spark/sql/execution/command/commands.scala  |  6 +++---
 .../sql/execution/command/createDataSourceTables.scala |  3 +++
 .../org/apache/spark/sql/execution/command/views.scala |  3 +++
 .../datasources/InsertIntoDataSourceCommand.scala      |  3 +++
 8 files changed, 30 insertions(+), 8 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index d4447ca32d..6784c3ae1d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -264,7 +264,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
     expressions.flatMap(_.collect {case e: SubqueryExpression => e.plan.asInstanceOf[PlanType]})
   }
 
-  override def innerChildren: Seq[PlanType] = subqueries
+  override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
 
   /**
    * Canonicalized copy of this query plan.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index d87e6c76ed..3ebd815dce 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -424,9 +424,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
    */
   protected def stringArgs: Iterator[Any] = productIterator
 
+  private lazy val allChildren: Set[TreeNode[_]] = (children ++ innerChildren).toSet[TreeNode[_]]
+
   /** Returns a string representing the arguments to this node, minus any children */
   def argString: String = productIterator.flatMap {
-    case tn: TreeNode[_] if containsChild(tn) => Nil
+    case tn: TreeNode[_] if allChildren.contains(tn) => Nil
+    case Some(tn: TreeNode[_]) if allChildren.contains(tn) => Nil
     case tn: TreeNode[_] => s"${tn.simpleString}" :: Nil
     case seq: Seq[BaseType] if seq.toSet.subsetOf(children.toSet) => Nil
     case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil
@@ -467,9 +470,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
   }
 
   /**
-   * All the nodes that are parts of this node, this is used by subquries.
+   * All the nodes that should be shown as a inner nested tree of this node.
+   * For example, this can be used to show sub-queries.
    */
-  protected def innerChildren: Seq[BaseType] = Nil
+  protected def innerChildren: Seq[TreeNode[_]] = Seq.empty
 
   /**
    * Appends the string represent of this node and its children to the given StringBuilder.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index ba61940b3d..7ccc9de9db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
@@ -70,6 +71,8 @@ private[sql] case class InMemoryRelation(
     private[sql] var _batchStats: ListAccumulator[InternalRow] = null)
   extends logical.LeafNode with MultiInstanceRelation {
 
+  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
+
   override def producedAttributes: AttributeSet = outputSet
 
   private[sql] val batchStats: ListAccumulator[InternalRow] =
@@ -222,6 +225,8 @@ private[sql] case class InMemoryTableScanExec(
     @transient relation: InMemoryRelation)
   extends LeafExecNode {
 
+  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren
+
   private[sql] override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index b1290a4759..3e5eed2efa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -19,15 +19,19 @@ package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
-
 case class CacheTableCommand(
   tableName: String,
   plan: Option[LogicalPlan],
   isLazy: Boolean)
   extends RunnableCommand {
 
+  override protected def innerChildren: Seq[QueryPlan[_]] = {
+    plan.toSeq
+  }
+
   override def run(sparkSession: SparkSession): Seq[Row] = {
     plan.foreach { logicalPlan =>
       Dataset.ofRows(sparkSession, logicalPlan).createOrReplaceTempView(tableName)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 642a95a992..38bb6e412f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
@@ -57,6 +58,8 @@ private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkP
     cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
   }
 
+  override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
+
   override def output: Seq[Attribute] = cmd.output
 
   override def children: Seq[SparkPlan] = Nil
@@ -68,11 +71,8 @@ private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkP
   protected override def doExecute(): RDD[InternalRow] = {
     sqlContext.sparkContext.parallelize(sideEffectResult, 1)
   }
-
-  override def argString: String = cmd.toString
 }
 
-
 /**
  * An explain command for users to see how a command will be executed.
  *
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 9956c5b092..66753fa7f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.HiveSerDe
@@ -138,6 +139,8 @@ case class CreateDataSourceTableAsSelectCommand(
     query: LogicalPlan)
   extends RunnableCommand {
 
+  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+
   override def run(sparkSession: SparkSession): Seq[Row] = {
     // Since we are saving metadata to metastore, we need to check if metastore supports
     // the table name and database name we have for this query. MetaStoreUtils.validateName
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 20c02786ec..b56c200e9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 
 
@@ -50,6 +51,8 @@ case class CreateViewCommand(
     isTemporary: Boolean)
   extends RunnableCommand {
 
+  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
+
   // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is
   // different from Hive and may not work for some cases like create view on self join.
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index 25b901f2db..8549ae96e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.sources.InsertableRelation
@@ -32,6 +33,8 @@ private[sql] case class InsertIntoDataSourceCommand(
     overwrite: Boolean)
   extends RunnableCommand {
 
+  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
     val data = Dataset.ofRows(sparkSession, query)
-- 
GitLab