Skip to content
Snippets Groups Projects
Commit c8fb776d authored by Sean Zhong's avatar Sean Zhong Committed by Wenchen Fan
Browse files

[SPARK-15692][SQL] Improves the explain output of several physical plans by...

[SPARK-15692][SQL] Improves the explain output of several physical plans by displaying embedded logical plan in tree style

## What changes were proposed in this pull request?

Improves the explain output of several physical plans by displaying embedded logical plan in tree style

Some physical plan contains a embedded logical plan, for example, `cache tableName query` maps to:

```
case class CacheTableCommand(
    tableName: String,
    plan: Option[LogicalPlan],
    isLazy: Boolean)
  extends RunnableCommand
```

It is easier to read the explain output if we can display the `plan` in tree style.

**Before change:**

Everything is messed in one line.

```
scala> Seq((1,2)).toDF().createOrReplaceTempView("testView")
scala> spark.sql("cache table testView2 select * from testView").explain()
== Physical Plan ==
ExecutedCommand CacheTableCommand testView2, Some('Project [*]
+- 'UnresolvedRelation `testView`, None
), false
```

**After change:**

```
scala> spark.sql("cache table testView2 select * from testView").explain()
== Physical Plan ==
ExecutedCommand
:  +- CacheTableCommand testView2, false
:     :  +- 'Project [*]
:     :     +- 'UnresolvedRelation `testView`, None
```

## How was this patch tested?

Manual test.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13433 from clockfly/verbose_breakdown_3_2.
parent 8640cdb8
No related branches found
No related tags found
No related merge requests found
Showing with 30 additions and 8 deletions
...@@ -264,7 +264,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT ...@@ -264,7 +264,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
expressions.flatMap(_.collect {case e: SubqueryExpression => e.plan.asInstanceOf[PlanType]}) expressions.flatMap(_.collect {case e: SubqueryExpression => e.plan.asInstanceOf[PlanType]})
} }
override def innerChildren: Seq[PlanType] = subqueries override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
/** /**
* Canonicalized copy of this query plan. * Canonicalized copy of this query plan.
......
...@@ -424,9 +424,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { ...@@ -424,9 +424,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
*/ */
protected def stringArgs: Iterator[Any] = productIterator protected def stringArgs: Iterator[Any] = productIterator
private lazy val allChildren: Set[TreeNode[_]] = (children ++ innerChildren).toSet[TreeNode[_]]
/** Returns a string representing the arguments to this node, minus any children */ /** Returns a string representing the arguments to this node, minus any children */
def argString: String = productIterator.flatMap { def argString: String = productIterator.flatMap {
case tn: TreeNode[_] if containsChild(tn) => Nil case tn: TreeNode[_] if allChildren.contains(tn) => Nil
case Some(tn: TreeNode[_]) if allChildren.contains(tn) => Nil
case tn: TreeNode[_] => s"${tn.simpleString}" :: Nil case tn: TreeNode[_] => s"${tn.simpleString}" :: Nil
case seq: Seq[BaseType] if seq.toSet.subsetOf(children.toSet) => Nil case seq: Seq[BaseType] if seq.toSet.subsetOf(children.toSet) => Nil
case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil
...@@ -467,9 +470,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { ...@@ -467,9 +470,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
} }
/** /**
* All the nodes that are parts of this node, this is used by subquries. * All the nodes that should be shown as a inner nested tree of this node.
* For example, this can be used to show sub-queries.
*/ */
protected def innerChildren: Seq[BaseType] = Nil protected def innerChildren: Seq[TreeNode[_]] = Seq.empty
/** /**
* Appends the string represent of this node and its children to the given StringBuilder. * Appends the string represent of this node and its children to the given StringBuilder.
......
...@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow ...@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.plans.physical.Partitioning
...@@ -70,6 +71,8 @@ private[sql] case class InMemoryRelation( ...@@ -70,6 +71,8 @@ private[sql] case class InMemoryRelation(
private[sql] var _batchStats: ListAccumulator[InternalRow] = null) private[sql] var _batchStats: ListAccumulator[InternalRow] = null)
extends logical.LeafNode with MultiInstanceRelation { extends logical.LeafNode with MultiInstanceRelation {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
override def producedAttributes: AttributeSet = outputSet override def producedAttributes: AttributeSet = outputSet
private[sql] val batchStats: ListAccumulator[InternalRow] = private[sql] val batchStats: ListAccumulator[InternalRow] =
...@@ -222,6 +225,8 @@ private[sql] case class InMemoryTableScanExec( ...@@ -222,6 +225,8 @@ private[sql] case class InMemoryTableScanExec(
@transient relation: InMemoryRelation) @transient relation: InMemoryRelation)
extends LeafExecNode { extends LeafExecNode {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren
private[sql] override lazy val metrics = Map( private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
......
...@@ -19,15 +19,19 @@ package org.apache.spark.sql.execution.command ...@@ -19,15 +19,19 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
case class CacheTableCommand( case class CacheTableCommand(
tableName: String, tableName: String,
plan: Option[LogicalPlan], plan: Option[LogicalPlan],
isLazy: Boolean) isLazy: Boolean)
extends RunnableCommand { extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]] = {
plan.toSeq
}
override def run(sparkSession: SparkSession): Seq[Row] = { override def run(sparkSession: SparkSession): Seq[Row] = {
plan.foreach { logicalPlan => plan.foreach { logicalPlan =>
Dataset.ofRows(sparkSession, logicalPlan).createOrReplaceTempView(tableName) Dataset.ofRows(sparkSession, logicalPlan).createOrReplaceTempView(tableName)
......
...@@ -22,6 +22,7 @@ import org.apache.spark.sql.{Row, SparkSession} ...@@ -22,6 +22,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.SparkPlan
...@@ -57,6 +58,8 @@ private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkP ...@@ -57,6 +58,8 @@ private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkP
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow]) cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
} }
override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
override def output: Seq[Attribute] = cmd.output override def output: Seq[Attribute] = cmd.output
override def children: Seq[SparkPlan] = Nil override def children: Seq[SparkPlan] = Nil
...@@ -68,11 +71,8 @@ private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkP ...@@ -68,11 +71,8 @@ private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkP
protected override def doExecute(): RDD[InternalRow] = { protected override def doExecute(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(sideEffectResult, 1) sqlContext.sparkContext.parallelize(sideEffectResult, 1)
} }
override def argString: String = cmd.toString
} }
/** /**
* An explain command for users to see how a command will be executed. * An explain command for users to see how a command will be executed.
* *
......
...@@ -27,6 +27,7 @@ import org.apache.spark.sql._ ...@@ -27,6 +27,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.HiveSerDe
...@@ -138,6 +139,8 @@ case class CreateDataSourceTableAsSelectCommand( ...@@ -138,6 +139,8 @@ case class CreateDataSourceTableAsSelectCommand(
query: LogicalPlan) query: LogicalPlan)
extends RunnableCommand { extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = { override def run(sparkSession: SparkSession): Seq[Row] = {
// Since we are saving metadata to metastore, we need to check if metastore supports // Since we are saving metadata to metastore, we need to check if metastore supports
// the table name and database name we have for this query. MetaStoreUtils.validateName // the table name and database name we have for this query. MetaStoreUtils.validateName
......
...@@ -23,6 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession} ...@@ -23,6 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
...@@ -50,6 +51,8 @@ case class CreateViewCommand( ...@@ -50,6 +51,8 @@ case class CreateViewCommand(
isTemporary: Boolean) isTemporary: Boolean)
extends RunnableCommand { extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
// TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is // TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is
// different from Hive and may not work for some cases like create view on self join. // different from Hive and may not work for some cases like create view on self join.
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.sources.InsertableRelation import org.apache.spark.sql.sources.InsertableRelation
...@@ -32,6 +33,8 @@ private[sql] case class InsertIntoDataSourceCommand( ...@@ -32,6 +33,8 @@ private[sql] case class InsertIntoDataSourceCommand(
overwrite: Boolean) overwrite: Boolean)
extends RunnableCommand { extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = { override def run(sparkSession: SparkSession): Seq[Row] = {
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
val data = Dataset.ofRows(sparkSession, query) val data = Dataset.ofRows(sparkSession, query)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment