From e8adc552df80af413e1d31b020489612d13a8770 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Fri, 20 May 2016 09:36:14 -0700
Subject: [PATCH] [SPARK-15435][SQL] Append Command to all commands

## What changes were proposed in this pull request?
We started this convention to append Command suffix to all SQL commands. However, not all commands follow that convention. This patch adds Command suffix to all RunnableCommands.

## How was this patch tested?
Updated test cases to reflect the renames.

Author: Reynold Xin <rxin@databricks.com>

Closes #13215 from rxin/SPARK-15435.
---
 .../spark/sql/execution/SparkSqlParser.scala  | 91 ++++++++++---------
 .../spark/sql/execution/SparkStrategies.scala |  6 +-
 ...eTable.scala => AnalyzeTableCommand.scala} |  6 +-
 .../command/createDataSourceTables.scala      |  3 +-
 .../spark/sql/execution/command/ddl.scala     | 26 +++---
 .../sql/execution/command/functions.scala     | 12 +--
 .../sql/execution/command/resources.scala     |  4 +-
 .../spark/sql/execution/command/tables.scala  | 37 ++++----
 .../execution/datasources/DataSource.scala    |  2 +-
 .../datasources/DataSourceStrategy.scala      |  4 +-
 ...cala => InsertIntoDataSourceCommand.scala} |  2 +-
 ...> InsertIntoHadoopFsRelationCommand.scala} | 12 +--
 .../spark/sql/execution/datasources/ddl.scala |  2 +-
 .../spark/sql/internal/SessionState.scala     |  4 +-
 .../execution/command/DDLCommandSuite.scala   | 90 +++++++++---------
 .../spark/sql/hive/HiveMetastoreCatalog.scala |  2 +-
 ...scala => CreateTableAsSelectCommand.scala} |  2 +-
 .../spark/sql/hive/HiveDDLCommandSuite.scala  | 12 +--
 .../spark/sql/hive/StatisticsSuite.scala      | 16 ++--
 .../apache/spark/sql/hive/parquetSuites.scala | 10 +-
 20 files changed, 173 insertions(+), 170 deletions(-)
 rename sql/core/src/main/scala/org/apache/spark/sql/execution/command/{AnalyzeTable.scala => AnalyzeTableCommand.scala} (95%)
 rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/{InsertIntoDataSource.scala => InsertIntoDataSourceCommand.scala} (97%)
 rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/{InsertIntoHadoopFsRelation.scala => InsertIntoHadoopFsRelationCommand.scala} (93%)
 rename sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/{CreateTableAsSelect.scala => CreateTableAsSelectCommand.scala} (99%)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index ee12bfa725..2966eefd07 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -76,8 +76,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other
-   * options are passed on to Hive) e.g.:
+   * Create an [[AnalyzeTableCommand]] command. This currently only implements the NOSCAN
+   * option (other options are passed on to Hive) e.g.:
    * {{{
    *   ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
    * }}}
@@ -86,11 +86,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     if (ctx.partitionSpec == null &&
       ctx.identifier != null &&
       ctx.identifier.getText.toLowerCase == "noscan") {
-      AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
+      AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
     } else {
       // Always just run the no scan analyze. We should fix this and implement full analyze
       // command in the future.
-      AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
+      AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
     }
   }
 
@@ -332,7 +332,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[LoadData]] command.
+   * Create a [[LoadDataCommand]] command.
    *
    * For example:
    * {{{
@@ -341,7 +341,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) {
-    LoadData(
+    LoadDataCommand(
       table = visitTableIdentifier(ctx.tableIdentifier),
       path = string(ctx.path),
       isLocal = ctx.LOCAL != null,
@@ -351,7 +351,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[TruncateTable]] command.
+   * Create a [[TruncateTableCommand]] command.
    *
    * For example:
    * {{{
@@ -363,7 +363,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     if (ctx.identifierList != null) {
       throw operationNotAllowed("TRUNCATE TABLE ... COLUMNS", ctx)
     }
-    TruncateTable(
+    TruncateTableCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
     )
@@ -422,7 +422,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[CreateDatabase]] command.
+   * Create a [[CreateDatabaseCommand]] command.
    *
    * For example:
    * {{{
@@ -431,7 +431,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitCreateDatabase(ctx: CreateDatabaseContext): LogicalPlan = withOrigin(ctx) {
-    CreateDatabase(
+    CreateDatabaseCommand(
       ctx.identifier.getText,
       ctx.EXISTS != null,
       Option(ctx.locationSpec).map(visitLocationSpec),
@@ -440,7 +440,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create an [[AlterDatabaseProperties]] command.
+   * Create an [[AlterDatabasePropertiesCommand]] command.
    *
    * For example:
    * {{{
@@ -449,13 +449,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    */
   override def visitSetDatabaseProperties(
       ctx: SetDatabasePropertiesContext): LogicalPlan = withOrigin(ctx) {
-    AlterDatabaseProperties(
+    AlterDatabasePropertiesCommand(
       ctx.identifier.getText,
       visitPropertyKeyValues(ctx.tablePropertyList))
   }
 
   /**
-   * Create a [[DropDatabase]] command.
+   * Create a [[DropDatabaseCommand]] command.
    *
    * For example:
    * {{{
@@ -463,11 +463,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitDropDatabase(ctx: DropDatabaseContext): LogicalPlan = withOrigin(ctx) {
-    DropDatabase(ctx.identifier.getText, ctx.EXISTS != null, ctx.CASCADE != null)
+    DropDatabaseCommand(ctx.identifier.getText, ctx.EXISTS != null, ctx.CASCADE != null)
   }
 
   /**
-   * Create a [[DescribeDatabase]] command.
+   * Create a [[DescribeDatabaseCommand]] command.
    *
    * For example:
    * {{{
@@ -475,11 +475,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitDescribeDatabase(ctx: DescribeDatabaseContext): LogicalPlan = withOrigin(ctx) {
-    DescribeDatabase(ctx.identifier.getText, ctx.EXTENDED != null)
+    DescribeDatabaseCommand(ctx.identifier.getText, ctx.EXTENDED != null)
   }
 
   /**
-   * Create a [[CreateFunction]] command.
+   * Create a [[CreateFunctionCommand]] command.
    *
    * For example:
    * {{{
@@ -500,7 +500,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
     // Extract database, name & alias.
     val functionIdentifier = visitFunctionName(ctx.qualifiedName)
-    CreateFunction(
+    CreateFunctionCommand(
       functionIdentifier.database,
       functionIdentifier.funcName,
       string(ctx.className),
@@ -509,7 +509,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[DropFunction]] command.
+   * Create a [[DropFunctionCommand]] command.
    *
    * For example:
    * {{{
@@ -518,7 +518,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    */
   override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) {
     val functionIdentifier = visitFunctionName(ctx.qualifiedName)
-    DropFunction(
+    DropFunctionCommand(
       functionIdentifier.database,
       functionIdentifier.funcName,
       ctx.EXISTS != null,
@@ -526,20 +526,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[DropTable]] command.
+   * Create a [[DropTableCommand]] command.
    */
   override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) {
     if (ctx.PURGE != null) {
       throw operationNotAllowed("DROP TABLE ... PURGE", ctx)
     }
-    DropTable(
+    DropTableCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       ctx.EXISTS != null,
       ctx.VIEW != null)
   }
 
   /**
-   * Create a [[AlterTableRename]] command.
+   * Create a [[AlterTableRenameCommand]] command.
    *
    * For example:
    * {{{
@@ -548,14 +548,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitRenameTable(ctx: RenameTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableRename(
+    AlterTableRenameCommand(
       visitTableIdentifier(ctx.from),
       visitTableIdentifier(ctx.to),
       ctx.VIEW != null)
   }
 
   /**
-   * Create an [[AlterTableSetProperties]] command.
+   * Create an [[AlterTableSetPropertiesCommand]] command.
    *
    * For example:
    * {{{
@@ -565,14 +565,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    */
   override def visitSetTableProperties(
       ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableSetProperties(
+    AlterTableSetPropertiesCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       visitPropertyKeyValues(ctx.tablePropertyList),
       ctx.VIEW != null)
   }
 
   /**
-   * Create an [[AlterTableUnsetProperties]] command.
+   * Create an [[AlterTableUnsetPropertiesCommand]] command.
    *
    * For example:
    * {{{
@@ -582,7 +582,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    */
   override def visitUnsetTableProperties(
       ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableUnsetProperties(
+    AlterTableUnsetPropertiesCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       visitPropertyKeys(ctx.tablePropertyList),
       ctx.EXISTS != null,
@@ -590,7 +590,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create an [[AlterTableSerDeProperties]] command.
+   * Create an [[AlterTableSerDePropertiesCommand]] command.
    *
    * For example:
    * {{{
@@ -599,7 +599,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitSetTableSerDe(ctx: SetTableSerDeContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableSerDeProperties(
+    AlterTableSerDePropertiesCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       Option(ctx.STRING).map(string),
       Option(ctx.tablePropertyList).map(visitPropertyKeyValues),
@@ -608,7 +608,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create an [[AlterTableAddPartition]] command.
+   * Create an [[AlterTableAddPartitionCommand]] command.
    *
    * For example:
    * {{{
@@ -636,14 +636,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       // Alter View: the location clauses are not allowed.
       ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec(_) -> None)
     }
-    AlterTableAddPartition(
+    AlterTableAddPartitionCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       specsAndLocs,
       ctx.EXISTS != null)
   }
 
   /**
-   * Create an [[AlterTableRenamePartition]] command
+   * Create an [[AlterTableRenamePartitionCommand]] command
    *
    * For example:
    * {{{
@@ -652,14 +652,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    */
   override def visitRenameTablePartition(
       ctx: RenameTablePartitionContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableRenamePartition(
+    AlterTableRenamePartitionCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       visitNonOptionalPartitionSpec(ctx.from),
       visitNonOptionalPartitionSpec(ctx.to))
   }
 
   /**
-   * Create an [[AlterTableDropPartition]] command
+   * Create an [[AlterTableDropPartitionCommand]] command
    *
    * For example:
    * {{{
@@ -678,14 +678,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     if (ctx.PURGE != null) {
       throw operationNotAllowed("ALTER TABLE ... DROP PARTITION ... PURGE", ctx)
     }
-    AlterTableDropPartition(
+    AlterTableDropPartitionCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
       ctx.EXISTS != null)
   }
 
   /**
-   * Create an [[AlterTableSetLocation]] command
+   * Create an [[AlterTableSetLocationCommand]] command
    *
    * For example:
    * {{{
@@ -693,7 +693,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitSetTableLocation(ctx: SetTableLocationContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableSetLocation(
+    AlterTableSetLocationCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
       visitLocationSpec(ctx.locationSpec))
@@ -759,18 +759,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create an [[AddJar]] or [[AddFile]] command depending on the requested resource.
+   * Create an [[AddJarCommand]] or [[AddFileCommand]] command depending on the requested resource.
    */
   override def visitAddResource(ctx: AddResourceContext): LogicalPlan = withOrigin(ctx) {
     ctx.identifier.getText.toLowerCase match {
-      case "file" => AddFile(remainder(ctx.identifier).trim)
-      case "jar" => AddJar(remainder(ctx.identifier).trim)
+      case "file" => AddFileCommand(remainder(ctx.identifier).trim)
+      case "jar" => AddJarCommand(remainder(ctx.identifier).trim)
       case other => throw operationNotAllowed(s"ADD with resource type '$other'", ctx)
     }
   }
 
   /**
-   * Create a table, returning either a [[CreateTable]] or a [[CreateTableAsSelectLogicalPlan]].
+   * Create a table, returning either a [[CreateTableCommand]] or a
+   * [[CreateTableAsSelectLogicalPlan]].
    *
    * This is not used to create datasource tables, which is handled through
    * "CREATE TABLE ... USING ...".
@@ -868,12 +869,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
     selectQuery match {
       case Some(q) => CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
-      case None => CreateTable(tableDesc, ifNotExists)
+      case None => CreateTableCommand(tableDesc, ifNotExists)
     }
   }
 
   /**
-   * Create a [[CreateTableLike]] command.
+   * Create a [[CreateTableLikeCommand]] command.
    *
    * For example:
    * {{{
@@ -884,7 +885,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
     val targetTable = visitTableIdentifier(ctx.target)
     val sourceTable = visitTableIdentifier(ctx.source)
-    CreateTableLike(targetTable, sourceTable, ctx.EXISTS != null)
+    CreateTableLikeCommand(targetTable, sourceTable, ctx.EXISTS != null)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5cfb6d5363..3343039ae1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -398,7 +398,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         sys.error("Cannot create temporary partitioned table.")
 
       case c: CreateTableUsingAsSelect if c.temporary =>
-        val cmd = CreateTempTableUsingAsSelect(
+        val cmd = CreateTempTableUsingAsSelectCommand(
           c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, c.child)
         ExecutedCommandExec(cmd) :: Nil
 
@@ -415,10 +415,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         ExecutedCommandExec(cmd) :: Nil
 
       case logical.ShowFunctions(db, pattern) =>
-        ExecutedCommandExec(ShowFunctions(db, pattern)) :: Nil
+        ExecutedCommandExec(ShowFunctionsCommand(db, pattern)) :: Nil
 
       case logical.DescribeFunction(function, extended) =>
-        ExecutedCommandExec(DescribeFunction(function, extended)) :: Nil
+        ExecutedCommandExec(DescribeFunctionCommand(function, extended)) :: Nil
 
       case _ => Nil
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
similarity index 95%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index de2db44b0e..a469d4da86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
  * Right now, it only supports Hive tables and it only updates the size of a Hive table
  * in the Hive metastore.
  */
-case class AnalyzeTable(tableName: String) extends RunnableCommand {
+case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val sessionState = sparkSession.sessionState
@@ -95,7 +95,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
           sessionState.catalog.alterTable(
             catalogTable.copy(
               properties = relation.catalogTable.properties +
-                (AnalyzeTable.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
+                (AnalyzeTableCommand.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
         }
 
       case otherRelation =>
@@ -106,6 +106,6 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
   }
 }
 
-object AnalyzeTable {
+object AnalyzeTableCommand {
   val TOTAL_SIZE_FIELD = "totalSize"
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 70e5108d93..6ca66a22df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.types._
 /**
  * A command used to create a data source table.
  *
- * Note: This is different from [[CreateTable]]. Please check the syntax for difference.
+ * Note: This is different from [[CreateTableCommand]]. Please check the syntax for difference.
  * This is not intended for temporary tables.
  *
  * The syntax of using this command in SQL is:
@@ -253,6 +253,7 @@ case class CreateDataSourceTableAsSelectCommand(
   }
 }
 
+
 object CreateDataSourceTableUtils extends Logging {
   /**
    * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 49d7fe956f..dd3f17d525 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.types._
  *     [WITH DBPROPERTIES (property_name=property_value, ...)];
  * }}}
  */
-case class CreateDatabase(
+case class CreateDatabaseCommand(
     databaseName: String,
     ifNotExists: Boolean,
     path: Option[String],
@@ -85,7 +85,7 @@ case class CreateDatabase(
  *    DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];
  * }}}
  */
-case class DropDatabase(
+case class DropDatabaseCommand(
     databaseName: String,
     ifExists: Boolean,
     cascade: Boolean)
@@ -108,7 +108,7 @@ case class DropDatabase(
  *    ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)
  * }}}
  */
-case class AlterDatabaseProperties(
+case class AlterDatabasePropertiesCommand(
     databaseName: String,
     props: Map[String, String])
   extends RunnableCommand {
@@ -134,7 +134,7 @@ case class AlterDatabaseProperties(
  *    DESCRIBE DATABASE [EXTENDED] db_name
  * }}}
  */
-case class DescribeDatabase(
+case class DescribeDatabaseCommand(
     databaseName: String,
     extended: Boolean)
   extends RunnableCommand {
@@ -175,7 +175,7 @@ case class DescribeDatabase(
  *   DROP VIEW [IF EXISTS] [db_name.]view_name;
  * }}}
  */
-case class DropTable(
+case class DropTableCommand(
     tableName: TableIdentifier,
     ifExists: Boolean,
     isView: Boolean) extends RunnableCommand {
@@ -220,7 +220,7 @@ case class DropTable(
  *   ALTER VIEW view1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
  * }}}
  */
-case class AlterTableSetProperties(
+case class AlterTableSetPropertiesCommand(
     tableName: TableIdentifier,
     properties: Map[String, String],
     isView: Boolean)
@@ -251,7 +251,7 @@ case class AlterTableSetProperties(
  *   ALTER VIEW view1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
  * }}}
  */
-case class AlterTableUnsetProperties(
+case class AlterTableUnsetPropertiesCommand(
     tableName: TableIdentifier,
     propKeys: Seq[String],
     ifExists: Boolean,
@@ -291,7 +291,7 @@ case class AlterTableUnsetProperties(
  *   ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties;
  * }}}
  */
-case class AlterTableSerDeProperties(
+case class AlterTableSerDePropertiesCommand(
     tableName: TableIdentifier,
     serdeClassName: Option[String],
     serdeProperties: Option[Map[String, String]],
@@ -330,7 +330,7 @@ case class AlterTableSerDeProperties(
  *   ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
  * }}}
  */
-case class AlterTableAddPartition(
+case class AlterTableAddPartitionCommand(
     tableName: TableIdentifier,
     partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
     ifNotExists: Boolean)
@@ -361,7 +361,7 @@ case class AlterTableAddPartition(
  *   ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2;
  * }}}
  */
-case class AlterTableRenamePartition(
+case class AlterTableRenamePartitionCommand(
     tableName: TableIdentifier,
     oldPartition: TablePartitionSpec,
     newPartition: TablePartitionSpec)
@@ -389,7 +389,7 @@ case class AlterTableRenamePartition(
  *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
  * }}}
  */
-case class AlterTableDropPartition(
+case class AlterTableDropPartitionCommand(
     tableName: TableIdentifier,
     specs: Seq[TablePartitionSpec],
     ifExists: Boolean)
@@ -420,7 +420,7 @@ case class AlterTableDropPartition(
  *    ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION "loc";
  * }}}
  */
-case class AlterTableSetLocation(
+case class AlterTableSetLocationCommand(
     tableName: TableIdentifier,
     partitionSpec: Option[TablePartitionSpec],
     location: String)
@@ -459,7 +459,7 @@ case class AlterTableSetLocation(
 }
 
 
-private[sql] object DDLUtils {
+object DDLUtils {
 
   def isDatasourceTable(props: Map[String, String]): Boolean = {
     props.contains("spark.sql.sources.provider")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index 1ea9bc5299..d2d8e3ddea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
  *    AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
  * }}}
  */
-case class CreateFunction(
+case class CreateFunctionCommand(
     databaseName: Option[String],
     functionName: String,
     className: String,
@@ -81,7 +81,7 @@ case class CreateFunction(
  *   DESCRIBE FUNCTION [EXTENDED] upper;
  * }}}
  */
-case class DescribeFunction(
+case class DescribeFunctionCommand(
     functionName: FunctionIdentifier,
     isExtended: Boolean) extends RunnableCommand {
 
@@ -142,7 +142,7 @@ case class DescribeFunction(
  * ifExists: returns an error if the function doesn't exist, unless this is true.
  * isTemp: indicates if it is a temporary function.
  */
-case class DropFunction(
+case class DropFunctionCommand(
     databaseName: Option[String],
     functionName: String,
     ifExists: Boolean,
@@ -180,10 +180,10 @@ case class DropFunction(
  * For the pattern, '*' matches any sequence of characters (including no characters) and
  * '|' is for alternation.
  * For example, "show functions like 'yea*|windo*'" will return "window" and "year".
- *
- * TODO currently we are simply ignore the db
  */
-case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand {
+case class ShowFunctionsCommand(db: Option[String], pattern: Option[String])
+  extends RunnableCommand {
+
   override val output: Seq[Attribute] = {
     val schema = StructType(StructField("function", StringType, nullable = false) :: Nil)
     schema.toAttributes
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
index 29bcb30592..162d493c1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 /**
  * Adds a jar to the current session so it can be used (for UDFs or serdes).
  */
-case class AddJar(path: String) extends RunnableCommand {
+case class AddJarCommand(path: String) extends RunnableCommand {
   override val output: Seq[Attribute] = {
     val schema = StructType(
       StructField("result", IntegerType, nullable = false) :: Nil)
@@ -40,7 +40,7 @@ case class AddJar(path: String) extends RunnableCommand {
 /**
  * Adds a file to the current session so it can be used.
  */
-case class AddFile(path: String) extends RunnableCommand {
+case class AddFileCommand(path: String) extends RunnableCommand {
   override def run(sparkSession: SparkSession): Seq[Row] = {
     sparkSession.sparkContext.addFile(path)
     Seq.empty[Row]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index d13492e550..13e63a1bef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -39,9 +39,9 @@ import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 case class CreateTableAsSelectLogicalPlan(
-  tableDesc: CatalogTable,
-  child: LogicalPlan,
-  allowExisting: Boolean) extends UnaryNode with Command {
+    tableDesc: CatalogTable,
+    child: LogicalPlan,
+    allowExisting: Boolean) extends UnaryNode with Command {
 
   override def output: Seq[Attribute] = Seq.empty[Attribute]
 
@@ -63,7 +63,7 @@ case class CreateTableAsSelectLogicalPlan(
  *   LIKE [other_db_name.]existing_table_name
  * }}}
  */
-case class CreateTableLike(
+case class CreateTableLikeCommand(
     targetTable: TableIdentifier,
     sourceTable: TableIdentifier,
     ifNotExists: Boolean) extends RunnableCommand {
@@ -115,7 +115,7 @@ case class CreateTableLike(
  *   [AS select_statement];
  * }}}
  */
-case class CreateTable(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
+case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     sparkSession.sessionState.catalog.createTable(table, ifNotExists)
@@ -134,7 +134,7 @@ case class CreateTable(table: CatalogTable, ifNotExists: Boolean) extends Runnab
  *    ALTER VIEW view1 RENAME TO view2;
  * }}}
  */
-case class AlterTableRename(
+case class AlterTableRenameCommand(
     oldName: TableIdentifier,
     newName: TableIdentifier,
     isView: Boolean)
@@ -159,7 +159,7 @@ case class AlterTableRename(
  *  [PARTITION (partcol1=val1, partcol2=val2 ...)]
  * }}}
  */
-case class LoadData(
+case class LoadDataCommand(
     table: TableIdentifier,
     path: String,
     isLocal: Boolean,
@@ -281,7 +281,7 @@ case class LoadData(
  *  TRUNCATE TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
  * }}}
  */
-case class TruncateTable(
+case class TruncateTableCommand(
     tableName: TableIdentifier,
     partitionSpec: Option[TablePartitionSpec]) extends RunnableCommand {
 
@@ -444,16 +444,17 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
       append(buffer, "Sort Columns:", sortColumns.mkString("[", ", ", "]"), "")
     }
 
-    DDLUtils.getBucketSpecFromTableProperties(metadata).map { bucketSpec =>
-      appendBucketInfo(
-        bucketSpec.numBuckets,
-        bucketSpec.bucketColumnNames,
-        bucketSpec.sortColumnNames)
-    }.getOrElse {
-      appendBucketInfo(
-        metadata.numBuckets,
-        metadata.bucketColumnNames,
-        metadata.sortColumnNames)
+    DDLUtils.getBucketSpecFromTableProperties(metadata) match {
+      case Some(bucketSpec) =>
+        appendBucketInfo(
+          bucketSpec.numBuckets,
+          bucketSpec.bucketColumnNames,
+          bucketSpec.sortColumnNames)
+      case None =>
+        appendBucketInfo(
+          metadata.numBuckets,
+          metadata.bucketColumnNames,
+          metadata.sortColumnNames)
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 2e17b763a5..e5dd4d81d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -433,7 +433,7 @@ case class DataSource(
         // ordering of data.logicalPlan (partition columns are all moved after data column).  This
         // will be adjusted within InsertIntoHadoopFsRelation.
         val plan =
-          InsertIntoHadoopFsRelation(
+          InsertIntoHadoopFsRelationCommand(
             outputPath,
             partitionColumns.map(UnresolvedAttribute.quoted),
             bucketSpec,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 0494fafb0e..a3d87cd38b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -66,7 +66,7 @@ private[sql] object DataSourceAnalysis extends Rule[LogicalPlan] {
           "Cannot overwrite a path that is also being read from.")
       }
 
-      InsertIntoHadoopFsRelation(
+      InsertIntoHadoopFsRelationCommand(
         outputPath,
         t.partitionSchema.fields.map(_.name).map(UnresolvedAttribute(_)),
         t.bucketSpec,
@@ -153,7 +153,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
 
     case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
       part, query, overwrite, false) if part.isEmpty =>
-      ExecutedCommandExec(InsertIntoDataSource(l, query, overwrite)) :: Nil
+      ExecutedCommandExec(InsertIntoDataSourceCommand(l, query, overwrite)) :: Nil
 
     case _ => Nil
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
similarity index 97%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index 7b15e49641..c3e07f7d00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.sources.InsertableRelation
 /**
  * Inserts the results of `query` in to a relation that extends [[InsertableRelation]].
  */
-private[sql] case class InsertIntoDataSource(
+private[sql] case class InsertIntoDataSourceCommand(
     logicalRelation: LogicalRelation,
     query: LogicalPlan,
     overwrite: Boolean)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
similarity index 93%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 4921e4ca6b..1426dcf469 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -35,11 +35,11 @@ import org.apache.spark.sql.internal.SQLConf
 
 /**
  * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
- * Writing to dynamic partitions is also supported.  Each [[InsertIntoHadoopFsRelation]] issues a
- * single write job, and owns a UUID that identifies this job.  Each concrete implementation of
- * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
- * each task output file.  This UUID is passed to executor side via a property named
- * `spark.sql.sources.writeJobUUID`.
+ * Writing to dynamic partitions is also supported.  Each [[InsertIntoHadoopFsRelationCommand]]
+ * issues a single write job, and owns a UUID that identifies this job.  Each concrete
+ * implementation of [[HadoopFsRelation]] should use this UUID together with task id to generate
+ * unique file path for each task output file.  This UUID is passed to executor side via a
+ * property named `spark.sql.sources.writeJobUUID`.
  *
  * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
  * are used to write to normal tables and tables with dynamic partitions.
@@ -55,7 +55,7 @@ import org.apache.spark.sql.internal.SQLConf
  *   4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception is
  *      thrown during job commitment, also aborts the job.
  */
-private[sql] case class InsertIntoHadoopFsRelation(
+private[sql] case class InsertIntoHadoopFsRelationCommand(
     outputPath: Path,
     partitionColumns: Seq[Attribute],
     bucketSpec: Option[BucketSpec],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 78b1db1682..edbccde214 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -91,7 +91,7 @@ case class CreateTempTableUsing(
   }
 }
 
-case class CreateTempTableUsingAsSelect(
+case class CreateTempTableUsingAsSelectCommand(
     tableIdent: TableIdentifier,
     provider: String,
     partitionColumns: Array[String],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 8f7c6f5d0c..939b9195ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.AnalyzeTable
+import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource}
 import org.apache.spark.sql.util.ExecutionListenerManager
 
@@ -189,6 +189,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {
    * in the external catalog.
    */
   def analyze(tableName: String): Unit = {
-    AnalyzeTable(tableName).run(sparkSession)
+    AnalyzeTableCommand(tableName).run(sparkSession)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 0925a51310..708b878c84 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -48,7 +48,7 @@ class DDLCommandSuite extends PlanTest {
        |WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
       """.stripMargin
     val parsed = parser.parsePlan(sql)
-    val expected = CreateDatabase(
+    val expected = CreateDatabaseCommand(
       "database_name",
       ifNotExists = true,
       Some("/home/user/db"),
@@ -82,19 +82,19 @@ class DDLCommandSuite extends PlanTest {
     val parsed6 = parser.parsePlan(sql6)
     val parsed7 = parser.parsePlan(sql7)
 
-    val expected1 = DropDatabase(
+    val expected1 = DropDatabaseCommand(
       "database_name",
       ifExists = true,
       cascade = false)
-    val expected2 = DropDatabase(
+    val expected2 = DropDatabaseCommand(
       "database_name",
       ifExists = true,
       cascade = true)
-    val expected3 = DropDatabase(
+    val expected3 = DropDatabaseCommand(
       "database_name",
       ifExists = false,
       cascade = false)
-    val expected4 = DropDatabase(
+    val expected4 = DropDatabaseCommand(
       "database_name",
       ifExists = false,
       cascade = true)
@@ -116,10 +116,10 @@ class DDLCommandSuite extends PlanTest {
     val parsed1 = parser.parsePlan(sql1)
     val parsed2 = parser.parsePlan(sql2)
 
-    val expected1 = AlterDatabaseProperties(
+    val expected1 = AlterDatabasePropertiesCommand(
       "database_name",
       Map("a" -> "a", "b" -> "b", "c" -> "c"))
-    val expected2 = AlterDatabaseProperties(
+    val expected2 = AlterDatabasePropertiesCommand(
       "database_name",
       Map("a" -> "a"))
 
@@ -141,10 +141,10 @@ class DDLCommandSuite extends PlanTest {
     val parsed1 = parser.parsePlan(sql1)
     val parsed2 = parser.parsePlan(sql2)
 
-    val expected1 = DescribeDatabase(
+    val expected1 = DescribeDatabaseCommand(
       "db_name",
       extended = true)
-    val expected2 = DescribeDatabase(
+    val expected2 = DescribeDatabaseCommand(
       "db_name",
       extended = false)
 
@@ -167,7 +167,7 @@ class DDLCommandSuite extends PlanTest {
       """.stripMargin
     val parsed1 = parser.parsePlan(sql1)
     val parsed2 = parser.parsePlan(sql2)
-    val expected1 = CreateFunction(
+    val expected1 = CreateFunctionCommand(
       None,
       "helloworld",
       "com.matthewrathbone.example.SimpleUDFExample",
@@ -175,7 +175,7 @@ class DDLCommandSuite extends PlanTest {
         FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"),
         FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")),
       isTemp = true)
-    val expected2 = CreateFunction(
+    val expected2 = CreateFunctionCommand(
       Some("hello"),
       "world",
       "com.matthewrathbone.example.SimpleUDFExample",
@@ -198,22 +198,22 @@ class DDLCommandSuite extends PlanTest {
     val parsed3 = parser.parsePlan(sql3)
     val parsed4 = parser.parsePlan(sql4)
 
-    val expected1 = DropFunction(
+    val expected1 = DropFunctionCommand(
       None,
       "helloworld",
       ifExists = false,
       isTemp = true)
-    val expected2 = DropFunction(
+    val expected2 = DropFunctionCommand(
       None,
       "helloworld",
       ifExists = true,
       isTemp = true)
-    val expected3 = DropFunction(
+    val expected3 = DropFunctionCommand(
       Some("hello"),
       "world",
       ifExists = false,
       isTemp = false)
-    val expected4 = DropFunction(
+    val expected4 = DropFunctionCommand(
       Some("hello"),
       "world",
       ifExists = true,
@@ -231,11 +231,11 @@ class DDLCommandSuite extends PlanTest {
       containsThesePhrases = Seq("create external table", "location"))
     val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
     parser.parsePlan(query) match {
-      case ct: CreateTable =>
+      case ct: CreateTableCommand =>
         assert(ct.table.tableType == CatalogTableType.EXTERNAL)
         assert(ct.table.storage.locationUri == Some("/something/anything"))
       case other =>
-        fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
+        fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
           s"got ${other.getClass.getName}: $query")
     }
   }
@@ -253,11 +253,11 @@ class DDLCommandSuite extends PlanTest {
   test("create table - location implies external") {
     val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
     parser.parsePlan(query) match {
-      case ct: CreateTable =>
+      case ct: CreateTableCommand =>
         assert(ct.table.tableType == CatalogTableType.EXTERNAL)
         assert(ct.table.storage.locationUri == Some("/something/anything"))
       case other =>
-        fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
+        fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
             s"got ${other.getClass.getName}: $query")
     }
   }
@@ -282,7 +282,7 @@ class DDLCommandSuite extends PlanTest {
         assert(Seq("a") == ct.partitionColumns.toSeq)
         comparePlans(ct.copy(partitionColumns = null), expected)
       case other =>
-        fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
+        fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
           s"got ${other.getClass.getName}: $query")
     }
   }
@@ -308,7 +308,7 @@ class DDLCommandSuite extends PlanTest {
         assert(ct.partitionColumns.isEmpty)
         comparePlans(ct.copy(partitionColumns = null), expected)
       case other =>
-        fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
+        fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
           s"got ${other.getClass.getName}: $query")
     }
   }
@@ -320,11 +320,11 @@ class DDLCommandSuite extends PlanTest {
     val sql_view = sql_table.replace("TABLE", "VIEW")
     val parsed_table = parser.parsePlan(sql_table)
     val parsed_view = parser.parsePlan(sql_view)
-    val expected_table = AlterTableRename(
+    val expected_table = AlterTableRenameCommand(
       TableIdentifier("table_name", None),
       TableIdentifier("new_table_name", None),
       isView = false)
-    val expected_view = AlterTableRename(
+    val expected_view = AlterTableRenameCommand(
       TableIdentifier("table_name", None),
       TableIdentifier("new_table_name", None),
       isView = true)
@@ -353,11 +353,11 @@ class DDLCommandSuite extends PlanTest {
     val parsed3_view = parser.parsePlan(sql3_view)
 
     val tableIdent = TableIdentifier("table_name", None)
-    val expected1_table = AlterTableSetProperties(
+    val expected1_table = AlterTableSetPropertiesCommand(
       tableIdent, Map("test" -> "test", "comment" -> "new_comment"), isView = false)
-    val expected2_table = AlterTableUnsetProperties(
+    val expected2_table = AlterTableUnsetPropertiesCommand(
       tableIdent, Seq("comment", "test"), ifExists = false, isView = false)
-    val expected3_table = AlterTableUnsetProperties(
+    val expected3_table = AlterTableUnsetPropertiesCommand(
       tableIdent, Seq("comment", "test"), ifExists = true, isView = false)
     val expected1_view = expected1_table.copy(isView = true)
     val expected2_view = expected2_table.copy(isView = true)
@@ -412,21 +412,21 @@ class DDLCommandSuite extends PlanTest {
     val parsed4 = parser.parsePlan(sql4)
     val parsed5 = parser.parsePlan(sql5)
     val tableIdent = TableIdentifier("table_name", None)
-    val expected1 = AlterTableSerDeProperties(
+    val expected1 = AlterTableSerDePropertiesCommand(
       tableIdent, Some("org.apache.class"), None, None)
-    val expected2 = AlterTableSerDeProperties(
+    val expected2 = AlterTableSerDePropertiesCommand(
       tableIdent,
       Some("org.apache.class"),
       Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
       None)
-    val expected3 = AlterTableSerDeProperties(
+    val expected3 = AlterTableSerDePropertiesCommand(
       tableIdent, None, Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), None)
-    val expected4 = AlterTableSerDeProperties(
+    val expected4 = AlterTableSerDePropertiesCommand(
       tableIdent,
       Some("org.apache.class"),
       Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
       Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))
-    val expected5 = AlterTableSerDeProperties(
+    val expected5 = AlterTableSerDePropertiesCommand(
       tableIdent,
       None,
       Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
@@ -459,13 +459,13 @@ class DDLCommandSuite extends PlanTest {
     val parsed1 = parser.parsePlan(sql1)
     val parsed2 = parser.parsePlan(sql2)
 
-    val expected1 = AlterTableAddPartition(
+    val expected1 = AlterTableAddPartitionCommand(
       TableIdentifier("table_name", None),
       Seq(
         (Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
         (Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
       ifNotExists = true)
-    val expected2 = AlterTableAddPartition(
+    val expected2 = AlterTableAddPartitionCommand(
       TableIdentifier("table_name", None),
       Seq((Map("dt" -> "2008-08-08"), Some("loc"))),
       ifNotExists = false)
@@ -490,7 +490,7 @@ class DDLCommandSuite extends PlanTest {
        |RENAME TO PARTITION (dt='2008-09-09', country='uk')
       """.stripMargin
     val parsed = parser.parsePlan(sql)
-    val expected = AlterTableRenamePartition(
+    val expected = AlterTableRenamePartitionCommand(
       TableIdentifier("table_name", None),
       Map("dt" -> "2008-08-08", "country" -> "us"),
       Map("dt" -> "2008-09-09", "country" -> "uk"))
@@ -529,7 +529,7 @@ class DDLCommandSuite extends PlanTest {
     assertUnsupported(sql2_view)
 
     val tableIdent = TableIdentifier("table_name", None)
-    val expected1_table = AlterTableDropPartition(
+    val expected1_table = AlterTableDropPartitionCommand(
       tableIdent,
       Seq(
         Map("dt" -> "2008-08-08", "country" -> "us"),
@@ -565,11 +565,11 @@ class DDLCommandSuite extends PlanTest {
     val parsed1 = parser.parsePlan(sql1)
     val parsed2 = parser.parsePlan(sql2)
     val tableIdent = TableIdentifier("table_name", None)
-    val expected1 = AlterTableSetLocation(
+    val expected1 = AlterTableSetLocationCommand(
       tableIdent,
       None,
       "new location")
-    val expected2 = AlterTableSetLocation(
+    val expected2 = AlterTableSetLocationCommand(
       tableIdent,
       Some(Map("dt" -> "2008-08-08", "country" -> "us")),
       "new location")
@@ -676,13 +676,13 @@ class DDLCommandSuite extends PlanTest {
     assertUnsupported(s"DROP TABLE IF EXISTS $tableName2 PURGE")
 
     val expected1 =
-      DropTable(TableIdentifier("tab", Option("db")), ifExists = false, isView = false)
+      DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false)
     val expected2 =
-      DropTable(TableIdentifier("tab", Option("db")), ifExists = true, isView = false)
+      DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false)
     val expected3 =
-      DropTable(TableIdentifier("tab", None), ifExists = false, isView = false)
+      DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false)
     val expected4 =
-      DropTable(TableIdentifier("tab", None), ifExists = true, isView = false)
+      DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false)
 
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
@@ -700,13 +700,13 @@ class DDLCommandSuite extends PlanTest {
     val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2")
 
     val expected1 =
-      DropTable(TableIdentifier("view", Option("db")), ifExists = false, isView = true)
+      DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true)
     val expected2 =
-      DropTable(TableIdentifier("view", Option("db")), ifExists = true, isView = true)
+      DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true)
     val expected3 =
-      DropTable(TableIdentifier("view", None), ifExists = false, isView = true)
+      DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true)
     val expected4 =
-      DropTable(TableIdentifier("view", None), ifExists = true, isView = true)
+      DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true)
 
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 4c528fbbbe..86ab152402 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -485,7 +485,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 
           val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
 
-          execution.CreateTableAsSelect(
+          execution.CreateTableAsSelectCommand(
             desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
             child,
             allowExisting)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
similarity index 99%
rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
index 9dfbafae87..3fc900961e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.hive.MetastoreRelation
  *                      raise exception
  */
 private[hive]
-case class CreateTableAsSelect(
+case class CreateTableAsSelectCommand(
     tableDesc: CatalogTable,
     query: LogicalPlan,
     allowExisting: Boolean)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 2d8b1f325a..30ad392969 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewCommand, LoadData}
+import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.hive.test.TestHive
 
 class HiveDDLCommandSuite extends PlanTest {
@@ -37,7 +37,7 @@ class HiveDDLCommandSuite extends PlanTest {
 
   private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
     parser.parsePlan(sql).collect {
-      case CreateTable(desc, allowExisting) => (desc, allowExisting)
+      case CreateTableCommand(desc, allowExisting) => (desc, allowExisting)
       case CreateTableAsSelectLogicalPlan(desc, _, allowExisting) => (desc, allowExisting)
       case CreateViewCommand(desc, _, allowExisting, _, _, _) => (desc, allowExisting)
     }.head
@@ -555,7 +555,7 @@ class HiveDDLCommandSuite extends PlanTest {
   test("create table like") {
     val v1 = "CREATE TABLE table1 LIKE table2"
     val (target, source, exists) = parser.parsePlan(v1).collect {
-      case CreateTableLike(t, s, allowExisting) => (t, s, allowExisting)
+      case CreateTableLikeCommand(t, s, allowExisting) => (t, s, allowExisting)
     }.head
     assert(exists == false)
     assert(target.database.isEmpty)
@@ -565,7 +565,7 @@ class HiveDDLCommandSuite extends PlanTest {
 
     val v2 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2"
     val (target2, source2, exists2) = parser.parsePlan(v2).collect {
-      case CreateTableLike(t, s, allowExisting) => (t, s, allowExisting)
+      case CreateTableLikeCommand(t, s, allowExisting) => (t, s, allowExisting)
     }.head
     assert(exists2)
     assert(target2.database.isEmpty)
@@ -577,7 +577,7 @@ class HiveDDLCommandSuite extends PlanTest {
   test("load data") {
     val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1"
     val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect {
-      case LoadData(t, path, l, o, partition) => (t, path, l, o, partition)
+      case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition)
     }.head
     assert(table.database.isEmpty)
     assert(table.table == "table1")
@@ -588,7 +588,7 @@ class HiveDDLCommandSuite extends PlanTest {
 
     val v2 = "LOAD DATA LOCAL INPATH 'path' OVERWRITE INTO TABLE table1 PARTITION(c='1', d='2')"
     val (table2, path2, isLocal2, isOverwrite2, partition2) = parser.parsePlan(v2).collect {
-      case LoadData(t, path, l, o, partition) => (t, path, l, o, partition)
+      case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition)
     }.head
     assert(table2.database.isEmpty)
     assert(table2.table == "table1")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 7011cd8122..1a7b6c0112 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -21,7 +21,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.AnalyzeTable
+import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -33,7 +33,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
     def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
       val parsed = hiveContext.parseSql(analyzeCommand)
       val operators = parsed.collect {
-        case a: AnalyzeTable => a
+        case a: AnalyzeTableCommand => a
         case o => o
       }
 
@@ -49,23 +49,23 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
 
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 COMPUTE STATISTICS",
-      classOf[AnalyzeTable])
+      classOf[AnalyzeTableCommand])
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
-      classOf[AnalyzeTable])
+      classOf[AnalyzeTableCommand])
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
-      classOf[AnalyzeTable])
+      classOf[AnalyzeTableCommand])
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS",
-      classOf[AnalyzeTable])
+      classOf[AnalyzeTableCommand])
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS noscan",
-      classOf[AnalyzeTable])
+      classOf[AnalyzeTableCommand])
 
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 COMPUTE STATISTICS nOscAn",
-      classOf[AnalyzeTable])
+      classOf[AnalyzeTableCommand])
   }
 
   ignore("analyze MetastoreRelations") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 7fe158c218..3e5140fe57 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.command.ExecutedCommandExec
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
 import org.apache.spark.sql.hive.execution.HiveTableScanExec
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -308,10 +308,10 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
 
       val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
       df.queryExecution.sparkPlan match {
-        case ExecutedCommandExec(_: InsertIntoHadoopFsRelation) => // OK
+        case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
         case o => fail("test_insert_parquet should be converted to a " +
           s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
-          s"${classOf[InsertIntoDataSource].getCanonicalName} is expected as the SparkPlan. " +
+          s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " +
           s"However, found a ${o.toString} ")
       }
 
@@ -338,10 +338,10 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
 
       val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
       df.queryExecution.sparkPlan match {
-        case ExecutedCommandExec(_: InsertIntoHadoopFsRelation) => // OK
+        case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
         case o => fail("test_insert_parquet should be converted to a " +
           s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
-          s"${classOf[InsertIntoDataSource].getCanonicalName} is expected as the SparkPlan." +
+          s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." +
           s"However, found a ${o.toString} ")
       }
 
-- 
GitLab