From b3e86dc62476abb03b330f86a788aa19a6565317 Mon Sep 17 00:00:00 2001
From: scwf <wangfei1@huawei.com>
Date: Sat, 10 Jan 2015 14:08:04 -0800
Subject: [PATCH] [SPARK-4861][SQL] Refactory command in spark sql

Follow up for #3712.
This PR finally remove ```CommandStrategy``` and make all commands follow ```RunnableCommand``` so they can go with ```case r: RunnableCommand => ExecutedCommand(r) :: Nil```.

One exception is the ```DescribeCommand``` of hive, which is a special case and need to distinguish hive table and temporary table, so still keep ```HiveCommandStrategy``` here.

Author: scwf <wangfei1@huawei.com>

Closes #3948 from scwf/followup-SPARK-4861 and squashes the following commits:

6b48e64 [scwf] minor style fix
2c62e9d [scwf] fix for hive module
5a7a819 [scwf] Refactory command in spark sql
---
 ...ser.scala => AbstractSparkSQLParser.scala} | 69 -------------
 .../sql/catalyst/plans/logical/commands.scala | 48 +--------
 .../org/apache/spark/sql/SQLContext.scala     |  3 +-
 .../org/apache/spark/sql/SparkSQLParser.scala | 97 +++++++++++++++++++
 .../spark/sql/execution/SparkStrategies.scala | 20 +---
 .../apache/spark/sql/execution/commands.scala |  2 +-
 .../spark/sql/hive/thriftserver/Shim12.scala  |  4 +-
 .../spark/sql/hive/thriftserver/Shim13.scala  |  4 +-
 .../apache/spark/sql/hive/HiveContext.scala   |  4 +-
 .../org/apache/spark/sql/hive/HiveQl.scala    | 31 +++++-
 .../spark/sql/hive/HiveStrategies.scala       |  5 +-
 .../org/apache/spark/sql/hive/TestHive.scala  |  3 +-
 .../hive/execution/HiveComparisonTest.scala   |  2 +
 13 files changed, 141 insertions(+), 151 deletions(-)
 rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/{SparkSQLParser.scala => AbstractSparkSQLParser.scala} (62%)
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
similarity index 62%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
index f1a1ca6616..93d74adbcc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
@@ -105,72 +105,3 @@ class SqlLexical(val keywords: Seq[String]) extends StdLexical {
     }
   }
 }
-
-/**
- * The top level Spark SQL parser. This parser recognizes syntaxes that are available for all SQL
- * dialects supported by Spark SQL, and delegates all the other syntaxes to the `fallback` parser.
- *
- * @param fallback A function that parses an input string to a logical plan
- */
-private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser {
-
-  // A parser for the key-value part of the "SET [key = [value ]]" syntax
-  private object SetCommandParser extends RegexParsers {
-    private val key: Parser[String] = "(?m)[^=]+".r
-
-    private val value: Parser[String] = "(?m).*$".r
-
-    private val pair: Parser[LogicalPlan] =
-      (key ~ ("=".r ~> value).?).? ^^ {
-        case None => SetCommand(None)
-        case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)))
-      }
-
-    def apply(input: String): LogicalPlan = parseAll(pair, input) match {
-      case Success(plan, _) => plan
-      case x => sys.error(x.toString)
-    }
-  }
-
-  protected val AS      = Keyword("AS")
-  protected val CACHE   = Keyword("CACHE")
-  protected val LAZY    = Keyword("LAZY")
-  protected val SET     = Keyword("SET")
-  protected val TABLE   = Keyword("TABLE")
-  protected val UNCACHE = Keyword("UNCACHE")
-
-  protected implicit def asParser(k: Keyword): Parser[String] =
-    lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
-
-  private val reservedWords: Seq[String] =
-    this
-      .getClass
-      .getMethods
-      .filter(_.getReturnType == classOf[Keyword])
-      .map(_.invoke(this).asInstanceOf[Keyword].str)
-
-  override val lexical = new SqlLexical(reservedWords)
-
-  override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others
-
-  private lazy val cache: Parser[LogicalPlan] =
-    CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ {
-      case isLazy ~ tableName ~ plan =>
-        CacheTableCommand(tableName, plan.map(fallback), isLazy.isDefined)
-    }
-
-  private lazy val uncache: Parser[LogicalPlan] =
-    UNCACHE ~ TABLE ~> ident ^^ {
-      case tableName => UncacheTableCommand(tableName)
-    }
-
-  private lazy val set: Parser[LogicalPlan] =
-    SET ~> restInput ^^ {
-      case input => SetCommandParser(input)
-    }
-
-  private lazy val others: Parser[LogicalPlan] =
-    wholeInput ^^ {
-      case input => fallback(input)
-    }
-}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index 5a1863953e..45905f8ef9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.types.StringType
+import org.apache.spark.sql.catalyst.expressions.Attribute
 
 /**
  * A logical node that represents a non-query command to be executed by the system.  For example,
@@ -28,48 +27,3 @@ abstract class Command extends LeafNode {
   self: Product =>
   def output: Seq[Attribute] = Seq.empty
 }
-
-/**
- *
- * Commands of the form "SET [key [= value] ]".
- */
-case class SetCommand(kv: Option[(String, Option[String])]) extends Command {
-  override def output = Seq(
-    AttributeReference("", StringType, nullable = false)())
-}
-
-/**
- * Returned by a parser when the users only wants to see what query plan would be executed, without
- * actually performing the execution.
- */
-case class ExplainCommand(plan: LogicalPlan, extended: Boolean = false) extends Command {
-  override def output =
-    Seq(AttributeReference("plan", StringType, nullable = false)())
-}
-
-/**
- * Returned for the "CACHE TABLE tableName [AS SELECT ...]" command.
- */
-case class CacheTableCommand(tableName: String, plan: Option[LogicalPlan], isLazy: Boolean)
-  extends Command
-
-/**
- * Returned for the "UNCACHE TABLE tableName" command.
- */
-case class UncacheTableCommand(tableName: String) extends Command
-
-/**
- * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
- * @param table The table to be described.
- * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
- *                   It is effective only when the table is a Hive table.
- */
-case class DescribeCommand(
-    table: LogicalPlan,
-    isExtended: Boolean) extends Command {
-  override def output = Seq(
-    // Column names are based on Hive.
-    AttributeReference("col_name", StringType, nullable = false)(),
-    AttributeReference("data_type", StringType, nullable = false)(),
-    AttributeReference("comment", StringType, nullable = false)())
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 9962937277..6c575dd727 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -76,7 +76,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   @transient
   protected[sql] val sqlParser = {
     val fallback = new catalyst.SqlParser
-    new catalyst.SparkSQLParser(fallback(_))
+    new SparkSQLParser(fallback(_))
   }
 
   protected[sql] def parseSql(sql: String): LogicalPlan = {
@@ -329,7 +329,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
     def strategies: Seq[Strategy] =
       extraStrategies ++ (
-      CommandStrategy ::
       DataSourceStrategy ::
       TakeOrdered ::
       HashAggregation ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
new file mode 100644
index 0000000000..65358b7d4e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.{SqlLexical, AbstractSparkSQLParser}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{UncacheTableCommand, CacheTableCommand, SetCommand}
+
+import scala.util.parsing.combinator.RegexParsers
+
+/**
+ * The top level Spark SQL parser. This parser recognizes syntaxes that are available for all SQL
+ * dialects supported by Spark SQL, and delegates all the other syntaxes to the `fallback` parser.
+ *
+ * @param fallback A function that parses an input string to a logical plan
+ */
+private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser {
+
+  // A parser for the key-value part of the "SET [key = [value ]]" syntax
+  private object SetCommandParser extends RegexParsers {
+    private val key: Parser[String] = "(?m)[^=]+".r
+
+    private val value: Parser[String] = "(?m).*$".r
+
+    private val output: Seq[Attribute] = Seq(AttributeReference("", StringType, nullable = false)())
+
+    private val pair: Parser[LogicalPlan] =
+      (key ~ ("=".r ~> value).?).? ^^ {
+        case None => SetCommand(None, output)
+        case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)), output)
+      }
+
+    def apply(input: String): LogicalPlan = parseAll(pair, input) match {
+      case Success(plan, _) => plan
+      case x => sys.error(x.toString)
+    }
+  }
+
+  protected val AS      = Keyword("AS")
+  protected val CACHE   = Keyword("CACHE")
+  protected val LAZY    = Keyword("LAZY")
+  protected val SET     = Keyword("SET")
+  protected val TABLE   = Keyword("TABLE")
+  protected val UNCACHE = Keyword("UNCACHE")
+
+  protected implicit def asParser(k: Keyword): Parser[String] =
+    lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
+
+  private val reservedWords: Seq[String] =
+    this
+      .getClass
+      .getMethods
+      .filter(_.getReturnType == classOf[Keyword])
+      .map(_.invoke(this).asInstanceOf[Keyword].str)
+
+  override val lexical = new SqlLexical(reservedWords)
+
+  override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others
+
+  private lazy val cache: Parser[LogicalPlan] =
+    CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ {
+      case isLazy ~ tableName ~ plan =>
+        CacheTableCommand(tableName, plan.map(fallback), isLazy.isDefined)
+    }
+
+  private lazy val uncache: Parser[LogicalPlan] =
+    UNCACHE ~ TABLE ~> ident ^^ {
+      case tableName => UncacheTableCommand(tableName)
+    }
+
+  private lazy val set: Parser[LogicalPlan] =
+    SET ~> restInput ^^ {
+      case input => SetCommandParser(input)
+    }
+
+  private lazy val others: Parser[LogicalPlan] =
+    wholeInput ^^ {
+      case input => fallback(input)
+    }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ce878c137e..99b6611d3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -259,6 +259,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
     def numPartitions = self.numPartitions
 
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case r: RunnableCommand => ExecutedCommand(r) :: Nil
+
       case logical.Distinct(child) =>
         execution.Distinct(partial = false,
           execution.Distinct(partial = true, planLater(child))) :: Nil
@@ -308,22 +310,4 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case _ => Nil
     }
   }
-
-  case object CommandStrategy extends Strategy {
-    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case r: RunnableCommand => ExecutedCommand(r) :: Nil
-      case logical.SetCommand(kv) =>
-        Seq(ExecutedCommand(execution.SetCommand(kv, plan.output)))
-      case logical.ExplainCommand(logicalPlan, extended) =>
-        Seq(ExecutedCommand(
-          execution.ExplainCommand(logicalPlan, plan.output, extended)))
-      case logical.CacheTableCommand(tableName, optPlan, isLazy) =>
-        Seq(ExecutedCommand(
-          execution.CacheTableCommand(tableName, optPlan, isLazy)))
-      case logical.UncacheTableCommand(tableName) =>
-        Seq(ExecutedCommand(
-          execution.UncacheTableCommand(tableName)))
-      case _ => Nil
-    }
-  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index b8fa4b0199..0d765c4c92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -113,7 +113,7 @@ case class SetCommand(
 @DeveloperApi
 case class ExplainCommand(
     logicalPlan: LogicalPlan,
-    override val output: Seq[Attribute], extended: Boolean) extends RunnableCommand {
+    override val output: Seq[Attribute], extended: Boolean = false) extends RunnableCommand {
 
   // Run through the optimizer to generate the physical plan.
   override def run(sqlContext: SQLContext) = try {
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index 5550183621..80733ea1db 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -33,8 +33,8 @@ import org.apache.hive.service.cli.operation.ExecuteStatementOperation
 import org.apache.hive.service.cli.session.HiveSession
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.plans.logical.SetCommand
 import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.SetCommand
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
 import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
 import org.apache.spark.sql.{SQLConf, SchemaRDD, Row => SparkRow}
@@ -190,7 +190,7 @@ private[hive] class SparkExecuteStatementOperation(
       result = hiveContext.sql(statement)
       logDebug(result.queryExecution.toString())
       result.queryExecution.logical match {
-        case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
+        case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value))), _) =>
           sessionToActivePool(parentSession.getSessionHandle) = value
           logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
         case _ =>
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 798a690a20..19d8514007 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -31,7 +31,7 @@ import org.apache.hive.service.cli.operation.ExecuteStatementOperation
 import org.apache.hive.service.cli.session.HiveSession
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.plans.logical.SetCommand
+import org.apache.spark.sql.execution.SetCommand
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
 import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
@@ -161,7 +161,7 @@ private[hive] class SparkExecuteStatementOperation(
       result = hiveContext.sql(statement)
       logDebug(result.queryExecution.toString())
       result.queryExecution.logical match {
-        case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
+        case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value))), _) =>
           sessionToActivePool(parentSession.getSessionHandle) = value
           logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
         case _ =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 1648fa826b..02eac43b21 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -38,8 +38,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.types.DecimalType
-import org.apache.spark.sql.catalyst.types.decimal.Decimal
-import org.apache.spark.sql.execution.{SparkPlan, ExecutedCommand, ExtractPythonUdfs, QueryExecutionException}
+import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException}
 import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand}
 import org.apache.spark.sql.sources.DataSourceStrategy
 
@@ -340,7 +339,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
 
     override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
       DataSourceStrategy,
-      CommandStrategy,
       HiveCommandStrategy(self),
       TakeOrdered,
       ParquetOperations,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index c2ab3579c1..28de03c389 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -24,8 +24,8 @@ import org.apache.hadoop.hive.ql.lib.Node
 import org.apache.hadoop.hive.ql.metadata.Table
 import org.apache.hadoop.hive.ql.parse._
 import org.apache.hadoop.hive.ql.plan.PlanUtils
+import org.apache.spark.sql.SparkSQLParser
 
-import org.apache.spark.sql.catalyst.SparkSQLParser
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.catalyst.types.decimal.Decimal
+import org.apache.spark.sql.execution.ExplainCommand
 import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable}
 
 /* Implicit conversions */
@@ -45,6 +46,22 @@ import scala.collection.JavaConversions._
  */
 private[hive] case object NativePlaceholder extends Command
 
+/**
+ * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
+ * @param table The table to be described.
+ * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
+ *                   It is effective only when the table is a Hive table.
+ */
+case class DescribeCommand(
+    table: LogicalPlan,
+    isExtended: Boolean) extends Command {
+  override def output = Seq(
+    // Column names are based on Hive.
+    AttributeReference("col_name", StringType, nullable = false)(),
+    AttributeReference("data_type", StringType, nullable = false)(),
+    AttributeReference("comment", StringType, nullable = false)())
+}
+
 /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
 private[hive] object HiveQl {
   protected val nativeCommands = Seq(
@@ -457,17 +474,23 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
     // Just fake explain for any of the native commands.
     case Token("TOK_EXPLAIN", explainArgs)
       if noExplainCommands.contains(explainArgs.head.getText) =>
-      ExplainCommand(NoRelation)
+      ExplainCommand(NoRelation, Seq(AttributeReference("plan", StringType, nullable = false)()))
     case Token("TOK_EXPLAIN", explainArgs)
       if "TOK_CREATETABLE" == explainArgs.head.getText =>
       val Some(crtTbl) :: _ :: extended :: Nil =
         getClauses(Seq("TOK_CREATETABLE", "FORMATTED", "EXTENDED"), explainArgs)
-      ExplainCommand(nodeToPlan(crtTbl), extended != None)
+      ExplainCommand(
+        nodeToPlan(crtTbl),
+        Seq(AttributeReference("plan", StringType,nullable = false)()),
+        extended != None)
     case Token("TOK_EXPLAIN", explainArgs) =>
       // Ignore FORMATTED if present.
       val Some(query) :: _ :: extended :: Nil =
         getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs)
-      ExplainCommand(nodeToPlan(query), extended != None)
+      ExplainCommand(
+        nodeToPlan(query),
+        Seq(AttributeReference("plan", StringType, nullable = false)()),
+        extended != None)
 
     case Token("TOK_DESCTABLE", describeArgs) =>
       // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index d3f6381b69..c439b9ebfe 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.types.StringType
+import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive
 import org.apache.spark.sql.hive.execution._
@@ -209,14 +210,14 @@ private[hive] trait HiveStrategies {
 
   case class HiveCommandStrategy(context: HiveContext) extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case describe: logical.DescribeCommand =>
+      case describe: DescribeCommand =>
         val resolvedTable = context.executePlan(describe.table).analyzed
         resolvedTable match {
           case t: MetastoreRelation =>
             ExecutedCommand(
               DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil
           case o: LogicalPlan =>
-            ExecutedCommand(DescribeCommand(planLater(o), describe.output)) :: Nil
+            ExecutedCommand(RunnableDescribeCommand(planLater(o), describe.output)) :: Nil
         }
 
       case _ => Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 8f2311cf83..1358a0eccb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -34,8 +34,9 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.util.Utils
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.CacheTableCommand
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 4104df8f8e..f8a957d55d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -22,6 +22,8 @@ import java.io._
 import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.execution.{SetCommand, ExplainCommand}
+import org.apache.spark.sql.hive.DescribeCommand
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util._
-- 
GitLab