diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 1011bf0bb5ef4acc56dad6a3a9a6c3dc7f14735c..b0e95908ee71a570b838e92dc50faeb2bc80609e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -600,6 +600,14 @@ trait Column extends DataFrame {
   def desc: Column = exprToColumn(SortOrder(expr, Descending), computable = false)
 
   def asc: Column = exprToColumn(SortOrder(expr, Ascending), computable = false)
+
+  override def explain(extended: Boolean): Unit = {
+    if (extended) {
+      println(expr)
+    } else {
+      println(expr.prettyString)
+    }
+  }
 }
 
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index ca8d552c5febfb8496960ae0976fe4baeddb65fb..17900c5ee38920b62c2282bda8f5677493ac7187 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -124,6 +124,12 @@ trait DataFrame extends RDDApi[Row] {
   /** Prints the schema to the console in a nice tree format. */
   def printSchema(): Unit
 
+  /** Prints the plans (logical and physical) to the console for debugging purpose. */
+  def explain(extended: Boolean): Unit
+
+  /** Only prints the physical plan to the console for debugging purpose. */
+  def explain(): Unit = explain(false)
+
   /**
    * Returns true if the `collect` and `take` methods can be run locally
    * (without any Spark executors).
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 0134b038f3c5aaf48f9ba0753dd2b2fddf049026..9638ce0865db0fdd34f14ebab342510eb84ee249 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -30,12 +30,11 @@ import org.apache.spark.api.python.SerDeUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.sql.catalyst.{SqlParser, ScalaReflection}
-import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, ResolvedStar, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedStar, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.sideBySide
-import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
+import org.apache.spark.sql.execution.{ExplainCommand, LogicalRDD, EvaluatePython}
 import org.apache.spark.sql.json.JsonRDD
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{NumericType, StructType}
@@ -115,6 +114,14 @@ private[sql] class DataFrameImpl protected[sql](
 
   override def printSchema(): Unit = println(schema.treeString)
 
+  override def explain(extended: Boolean): Unit = {
+    ExplainCommand(
+      logicalPlan,
+      extended = extended).queryExecution.executedPlan.executeCollect().map {
+      r => println(r.getString(0))
+    }
+  }
+
   override def isLocal: Boolean = {
     logicalPlan.isInstanceOf[LocalRelation]
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 335757087deef4fadabb793cafc17bbab9a7a403..2b1726ad4e89f2b0a0162b4c48b638c9bb4682b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.execution
 import org.apache.spark.Logging
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import scala.collection.mutable.ArrayBuffer
@@ -116,7 +117,9 @@ case class SetCommand(
 @DeveloperApi
 case class ExplainCommand(
     logicalPlan: LogicalPlan,
-    override val output: Seq[Attribute], extended: Boolean = false) extends RunnableCommand {
+    override val output: Seq[Attribute] =
+      Seq(AttributeReference("plan", StringType, nullable = false)()),
+    extended: Boolean = false) extends RunnableCommand {
 
   // Run through the optimizer to generate the physical plan.
   override def run(sqlContext: SQLContext) = try {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 8618301ba84d62af8b3ccac3c2531bc7d9ed02aa..f3c9e63652a8e17dff4e9e897c350f9c936f5e71 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -466,23 +466,21 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
     // Just fake explain for any of the native commands.
     case Token("TOK_EXPLAIN", explainArgs)
       if noExplainCommands.contains(explainArgs.head.getText) =>
-      ExplainCommand(NoRelation, Seq(AttributeReference("plan", StringType, nullable = false)()))
+      ExplainCommand(NoRelation)
     case Token("TOK_EXPLAIN", explainArgs)
       if "TOK_CREATETABLE" == explainArgs.head.getText =>
       val Some(crtTbl) :: _ :: extended :: Nil =
         getClauses(Seq("TOK_CREATETABLE", "FORMATTED", "EXTENDED"), explainArgs)
       ExplainCommand(
         nodeToPlan(crtTbl),
-        Seq(AttributeReference("plan", StringType,nullable = false)()),
-        extended != None)
+        extended = extended.isDefined)
     case Token("TOK_EXPLAIN", explainArgs) =>
       // Ignore FORMATTED if present.
       val Some(query) :: _ :: extended :: Nil =
         getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs)
       ExplainCommand(
         nodeToPlan(query),
-        Seq(AttributeReference("plan", StringType, nullable = false)()),
-        extended != None)
+        extended = extended.isDefined)
 
     case Token("TOK_DESCTABLE", describeArgs) =>
       // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL