From 45df77b8418873a00d770e435358bf603765595f Mon Sep 17 00:00:00 2001
From: Cheng Hao <hao.cheng@intel.com>
Date: Tue, 10 Feb 2015 19:40:51 -0800
Subject: [PATCH] [SPARK-5709] [SQL] Add EXPLAIN support in DataFrame API for
 debugging purpose

Author: Cheng Hao <hao.cheng@intel.com>

Closes #4496 from chenghao-intel/df_explain and squashes the following commits:

552aa58 [Cheng Hao] Add explain support for DF
---
 .../main/scala/org/apache/spark/sql/Column.scala    |  8 ++++++++
 .../main/scala/org/apache/spark/sql/DataFrame.scala |  6 ++++++
 .../scala/org/apache/spark/sql/DataFrameImpl.scala  | 13 ++++++++++---
 .../org/apache/spark/sql/execution/commands.scala   |  7 +++++--
 .../scala/org/apache/spark/sql/hive/HiveQl.scala    |  8 +++-----
 5 files changed, 32 insertions(+), 10 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 1011bf0bb5..b0e95908ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -600,6 +600,14 @@ trait Column extends DataFrame {
   def desc: Column = exprToColumn(SortOrder(expr, Descending), computable = false)
 
   def asc: Column = exprToColumn(SortOrder(expr, Ascending), computable = false)
+
+  override def explain(extended: Boolean): Unit = {
+    if (extended) {
+      println(expr)
+    } else {
+      println(expr.prettyString)
+    }
+  }
 }
 
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index ca8d552c5f..17900c5ee3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -124,6 +124,12 @@ trait DataFrame extends RDDApi[Row] {
   /** Prints the schema to the console in a nice tree format. */
   def printSchema(): Unit
 
+  /** Prints the plans (logical and physical) to the console for debugging purpose. */
+  def explain(extended: Boolean): Unit
+
+  /** Only prints the physical plan to the console for debugging purpose. */
+  def explain(): Unit = explain(false)
+
   /**
    * Returns true if the `collect` and `take` methods can be run locally
    * (without any Spark executors).
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 0134b038f3..9638ce0865 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -30,12 +30,11 @@ import org.apache.spark.api.python.SerDeUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.sql.catalyst.{SqlParser, ScalaReflection}
-import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, ResolvedStar, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedStar, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.sideBySide
-import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
+import org.apache.spark.sql.execution.{ExplainCommand, LogicalRDD, EvaluatePython}
 import org.apache.spark.sql.json.JsonRDD
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{NumericType, StructType}
@@ -115,6 +114,14 @@ private[sql] class DataFrameImpl protected[sql](
 
   override def printSchema(): Unit = println(schema.treeString)
 
+  override def explain(extended: Boolean): Unit = {
+    ExplainCommand(
+      logicalPlan,
+      extended = extended).queryExecution.executedPlan.executeCollect().map {
+      r => println(r.getString(0))
+    }
+  }
+
   override def isLocal: Boolean = {
     logicalPlan.isInstanceOf[LocalRelation]
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 335757087d..2b1726ad4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.execution
 import org.apache.spark.Logging
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import scala.collection.mutable.ArrayBuffer
@@ -116,7 +117,9 @@ case class SetCommand(
 @DeveloperApi
 case class ExplainCommand(
     logicalPlan: LogicalPlan,
-    override val output: Seq[Attribute], extended: Boolean = false) extends RunnableCommand {
+    override val output: Seq[Attribute] =
+      Seq(AttributeReference("plan", StringType, nullable = false)()),
+    extended: Boolean = false) extends RunnableCommand {
 
   // Run through the optimizer to generate the physical plan.
   override def run(sqlContext: SQLContext) = try {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 8618301ba8..f3c9e63652 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -466,23 +466,21 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
     // Just fake explain for any of the native commands.
     case Token("TOK_EXPLAIN", explainArgs)
       if noExplainCommands.contains(explainArgs.head.getText) =>
-      ExplainCommand(NoRelation, Seq(AttributeReference("plan", StringType, nullable = false)()))
+      ExplainCommand(NoRelation)
     case Token("TOK_EXPLAIN", explainArgs)
       if "TOK_CREATETABLE" == explainArgs.head.getText =>
       val Some(crtTbl) :: _ :: extended :: Nil =
         getClauses(Seq("TOK_CREATETABLE", "FORMATTED", "EXTENDED"), explainArgs)
       ExplainCommand(
         nodeToPlan(crtTbl),
-        Seq(AttributeReference("plan", StringType,nullable = false)()),
-        extended != None)
+        extended = extended.isDefined)
     case Token("TOK_EXPLAIN", explainArgs) =>
       // Ignore FORMATTED if present.
       val Some(query) :: _ :: extended :: Nil =
         getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs)
       ExplainCommand(
         nodeToPlan(query),
-        Seq(AttributeReference("plan", StringType, nullable = false)()),
-        extended != None)
+        extended = extended.isDefined)
 
     case Token("TOK_DESCTABLE", describeArgs) =>
       // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
-- 
GitLab