diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index ee12bfa7251fda7ceef32258f8137e5a228e1ecd..2966eefd07c772eb9d872945ff9bb22e622dc60b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -76,8 +76,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTable]] command. This currently only implements the NOSCAN option (other
-   * options are passed on to Hive) e.g.:
+   * Create an [[AnalyzeTableCommand]] command. This currently only implements the NOSCAN
+   * option (other options are passed on to Hive) e.g.:
    * {{{
    *   ANALYZE TABLE table COMPUTE STATISTICS NOSCAN;
    * }}}
@@ -86,11 +86,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     if (ctx.partitionSpec == null &&
       ctx.identifier != null &&
       ctx.identifier.getText.toLowerCase == "noscan") {
-      AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
+      AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
     } else {
       // Always just run the no scan analyze. We should fix this and implement full analyze
       // command in the future.
-      AnalyzeTable(visitTableIdentifier(ctx.tableIdentifier).toString)
+      AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier).toString)
     }
   }
 
@@ -332,7 +332,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[LoadData]] command.
+   * Create a [[LoadDataCommand]] command.
    *
    * For example:
    * {{{
@@ -341,7 +341,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) {
-    LoadData(
+    LoadDataCommand(
       table = visitTableIdentifier(ctx.tableIdentifier),
       path = string(ctx.path),
       isLocal = ctx.LOCAL != null,
@@ -351,7 +351,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[TruncateTable]] command.
+   * Create a [[TruncateTableCommand]] command.
    *
    * For example:
    * {{{
@@ -363,7 +363,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     if (ctx.identifierList != null) {
       throw operationNotAllowed("TRUNCATE TABLE ... COLUMNS", ctx)
     }
-    TruncateTable(
+    TruncateTableCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
     )
@@ -422,7 +422,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[CreateDatabase]] command.
+   * Create a [[CreateDatabaseCommand]] command.
    *
    * For example:
    * {{{
@@ -431,7 +431,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitCreateDatabase(ctx: CreateDatabaseContext): LogicalPlan = withOrigin(ctx) {
-    CreateDatabase(
+    CreateDatabaseCommand(
       ctx.identifier.getText,
       ctx.EXISTS != null,
       Option(ctx.locationSpec).map(visitLocationSpec),
@@ -440,7 +440,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create an [[AlterDatabaseProperties]] command.
+   * Create an [[AlterDatabasePropertiesCommand]] command.
    *
    * For example:
    * {{{
@@ -449,13 +449,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    */
   override def visitSetDatabaseProperties(
       ctx: SetDatabasePropertiesContext): LogicalPlan = withOrigin(ctx) {
-    AlterDatabaseProperties(
+    AlterDatabasePropertiesCommand(
       ctx.identifier.getText,
       visitPropertyKeyValues(ctx.tablePropertyList))
   }
 
   /**
-   * Create a [[DropDatabase]] command.
+   * Create a [[DropDatabaseCommand]] command.
    *
    * For example:
    * {{{
@@ -463,11 +463,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitDropDatabase(ctx: DropDatabaseContext): LogicalPlan = withOrigin(ctx) {
-    DropDatabase(ctx.identifier.getText, ctx.EXISTS != null, ctx.CASCADE != null)
+    DropDatabaseCommand(ctx.identifier.getText, ctx.EXISTS != null, ctx.CASCADE != null)
   }
 
   /**
-   * Create a [[DescribeDatabase]] command.
+   * Create a [[DescribeDatabaseCommand]] command.
    *
    * For example:
    * {{{
@@ -475,11 +475,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitDescribeDatabase(ctx: DescribeDatabaseContext): LogicalPlan = withOrigin(ctx) {
-    DescribeDatabase(ctx.identifier.getText, ctx.EXTENDED != null)
+    DescribeDatabaseCommand(ctx.identifier.getText, ctx.EXTENDED != null)
   }
 
   /**
-   * Create a [[CreateFunction]] command.
+   * Create a [[CreateFunctionCommand]] command.
    *
    * For example:
    * {{{
@@ -500,7 +500,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
     // Extract database, name & alias.
     val functionIdentifier = visitFunctionName(ctx.qualifiedName)
-    CreateFunction(
+    CreateFunctionCommand(
       functionIdentifier.database,
       functionIdentifier.funcName,
       string(ctx.className),
@@ -509,7 +509,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[DropFunction]] command.
+   * Create a [[DropFunctionCommand]] command.
    *
    * For example:
    * {{{
@@ -518,7 +518,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    */
   override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = withOrigin(ctx) {
     val functionIdentifier = visitFunctionName(ctx.qualifiedName)
-    DropFunction(
+    DropFunctionCommand(
       functionIdentifier.database,
       functionIdentifier.funcName,
       ctx.EXISTS != null,
@@ -526,20 +526,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[DropTable]] command.
+   * Create a [[DropTableCommand]] command.
    */
   override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) {
     if (ctx.PURGE != null) {
       throw operationNotAllowed("DROP TABLE ... PURGE", ctx)
     }
-    DropTable(
+    DropTableCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       ctx.EXISTS != null,
       ctx.VIEW != null)
   }
 
   /**
-   * Create a [[AlterTableRename]] command.
+   * Create a [[AlterTableRenameCommand]] command.
    *
    * For example:
    * {{{
@@ -548,14 +548,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitRenameTable(ctx: RenameTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableRename(
+    AlterTableRenameCommand(
       visitTableIdentifier(ctx.from),
       visitTableIdentifier(ctx.to),
       ctx.VIEW != null)
   }
 
   /**
-   * Create an [[AlterTableSetProperties]] command.
+   * Create an [[AlterTableSetPropertiesCommand]] command.
    *
    * For example:
    * {{{
@@ -565,14 +565,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    */
   override def visitSetTableProperties(
       ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableSetProperties(
+    AlterTableSetPropertiesCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       visitPropertyKeyValues(ctx.tablePropertyList),
       ctx.VIEW != null)
   }
 
   /**
-   * Create an [[AlterTableUnsetProperties]] command.
+   * Create an [[AlterTableUnsetPropertiesCommand]] command.
    *
    * For example:
    * {{{
@@ -582,7 +582,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    */
   override def visitUnsetTableProperties(
       ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableUnsetProperties(
+    AlterTableUnsetPropertiesCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       visitPropertyKeys(ctx.tablePropertyList),
       ctx.EXISTS != null,
@@ -590,7 +590,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create an [[AlterTableSerDeProperties]] command.
+   * Create an [[AlterTableSerDePropertiesCommand]] command.
    *
    * For example:
    * {{{
@@ -599,7 +599,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitSetTableSerDe(ctx: SetTableSerDeContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableSerDeProperties(
+    AlterTableSerDePropertiesCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       Option(ctx.STRING).map(string),
       Option(ctx.tablePropertyList).map(visitPropertyKeyValues),
@@ -608,7 +608,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create an [[AlterTableAddPartition]] command.
+   * Create an [[AlterTableAddPartitionCommand]] command.
    *
    * For example:
    * {{{
@@ -636,14 +636,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       // Alter View: the location clauses are not allowed.
       ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec(_) -> None)
     }
-    AlterTableAddPartition(
+    AlterTableAddPartitionCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       specsAndLocs,
       ctx.EXISTS != null)
   }
 
   /**
-   * Create an [[AlterTableRenamePartition]] command
+   * Create an [[AlterTableRenamePartitionCommand]] command
    *
    * For example:
    * {{{
@@ -652,14 +652,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    */
   override def visitRenameTablePartition(
       ctx: RenameTablePartitionContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableRenamePartition(
+    AlterTableRenamePartitionCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       visitNonOptionalPartitionSpec(ctx.from),
       visitNonOptionalPartitionSpec(ctx.to))
   }
 
   /**
-   * Create an [[AlterTableDropPartition]] command
+   * Create an [[AlterTableDropPartitionCommand]] command
    *
    * For example:
    * {{{
@@ -678,14 +678,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     if (ctx.PURGE != null) {
       throw operationNotAllowed("ALTER TABLE ... DROP PARTITION ... PURGE", ctx)
     }
-    AlterTableDropPartition(
+    AlterTableDropPartitionCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
       ctx.EXISTS != null)
   }
 
   /**
-   * Create an [[AlterTableSetLocation]] command
+   * Create an [[AlterTableSetLocationCommand]] command
    *
    * For example:
    * {{{
@@ -693,7 +693,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
    * }}}
    */
   override def visitSetTableLocation(ctx: SetTableLocationContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableSetLocation(
+    AlterTableSetLocationCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
       visitLocationSpec(ctx.locationSpec))
@@ -759,18 +759,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create an [[AddJar]] or [[AddFile]] command depending on the requested resource.
+   * Create an [[AddJarCommand]] or [[AddFileCommand]] command depending on the requested resource.
    */
   override def visitAddResource(ctx: AddResourceContext): LogicalPlan = withOrigin(ctx) {
     ctx.identifier.getText.toLowerCase match {
-      case "file" => AddFile(remainder(ctx.identifier).trim)
-      case "jar" => AddJar(remainder(ctx.identifier).trim)
+      case "file" => AddFileCommand(remainder(ctx.identifier).trim)
+      case "jar" => AddJarCommand(remainder(ctx.identifier).trim)
       case other => throw operationNotAllowed(s"ADD with resource type '$other'", ctx)
     }
   }
 
   /**
-   * Create a table, returning either a [[CreateTable]] or a [[CreateTableAsSelectLogicalPlan]].
+   * Create a table, returning either a [[CreateTableCommand]] or a
+   * [[CreateTableAsSelectLogicalPlan]].
    *
    * This is not used to create datasource tables, which is handled through
    * "CREATE TABLE ... USING ...".
@@ -868,12 +869,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
     selectQuery match {
       case Some(q) => CreateTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
-      case None => CreateTable(tableDesc, ifNotExists)
+      case None => CreateTableCommand(tableDesc, ifNotExists)
     }
   }
 
   /**
-   * Create a [[CreateTableLike]] command.
+   * Create a [[CreateTableLikeCommand]] command.
    *
    * For example:
    * {{{
@@ -884,7 +885,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
     val targetTable = visitTableIdentifier(ctx.target)
     val sourceTable = visitTableIdentifier(ctx.source)
-    CreateTableLike(targetTable, sourceTable, ctx.EXISTS != null)
+    CreateTableLikeCommand(targetTable, sourceTable, ctx.EXISTS != null)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5cfb6d5363a3b8f2e2e10659508c9c70bfe72a2f..3343039ae1cf3ddcb48eaad8438597797c0c07e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -398,7 +398,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         sys.error("Cannot create temporary partitioned table.")
 
       case c: CreateTableUsingAsSelect if c.temporary =>
-        val cmd = CreateTempTableUsingAsSelect(
+        val cmd = CreateTempTableUsingAsSelectCommand(
           c.tableIdent, c.provider, Array.empty[String], c.mode, c.options, c.child)
         ExecutedCommandExec(cmd) :: Nil
 
@@ -415,10 +415,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         ExecutedCommandExec(cmd) :: Nil
 
       case logical.ShowFunctions(db, pattern) =>
-        ExecutedCommandExec(ShowFunctions(db, pattern)) :: Nil
+        ExecutedCommandExec(ShowFunctionsCommand(db, pattern)) :: Nil
 
       case logical.DescribeFunction(function, extended) =>
-        ExecutedCommandExec(DescribeFunction(function, extended)) :: Nil
+        ExecutedCommandExec(DescribeFunctionCommand(function, extended)) :: Nil
 
       case _ => Nil
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
similarity index 95%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index de2db44b0e1b253585d6537e9ff71fb1d4b42ab7..a469d4da8613b500be1a47a4d15ae5c843340890 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
  * Right now, it only supports Hive tables and it only updates the size of a Hive table
  * in the Hive metastore.
  */
-case class AnalyzeTable(tableName: String) extends RunnableCommand {
+case class AnalyzeTableCommand(tableName: String) extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val sessionState = sparkSession.sessionState
@@ -95,7 +95,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
           sessionState.catalog.alterTable(
             catalogTable.copy(
               properties = relation.catalogTable.properties +
-                (AnalyzeTable.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
+                (AnalyzeTableCommand.TOTAL_SIZE_FIELD -> newTotalSize.toString)))
         }
 
       case otherRelation =>
@@ -106,6 +106,6 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
   }
 }
 
-object AnalyzeTable {
+object AnalyzeTableCommand {
   val TOTAL_SIZE_FIELD = "totalSize"
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 70e5108d938b9ca5bc9f22bea587f9ac3c624713..6ca66a22df12d81c3e4c2a3aa1963cea818bef9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.types._
 /**
  * A command used to create a data source table.
  *
- * Note: This is different from [[CreateTable]]. Please check the syntax for difference.
+ * Note: This is different from [[CreateTableCommand]]. Please check the syntax for difference.
  * This is not intended for temporary tables.
  *
  * The syntax of using this command in SQL is:
@@ -253,6 +253,7 @@ case class CreateDataSourceTableAsSelectCommand(
   }
 }
 
+
 object CreateDataSourceTableUtils extends Logging {
   /**
    * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 49d7fe956f812e5d6708fd776e1a6be6bc10efe5..dd3f17d525abd926d1d4498bcc8672aa9f62328b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.types._
  *     [WITH DBPROPERTIES (property_name=property_value, ...)];
  * }}}
  */
-case class CreateDatabase(
+case class CreateDatabaseCommand(
     databaseName: String,
     ifNotExists: Boolean,
     path: Option[String],
@@ -85,7 +85,7 @@ case class CreateDatabase(
  *    DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE];
  * }}}
  */
-case class DropDatabase(
+case class DropDatabaseCommand(
     databaseName: String,
     ifExists: Boolean,
     cascade: Boolean)
@@ -108,7 +108,7 @@ case class DropDatabase(
  *    ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)
  * }}}
  */
-case class AlterDatabaseProperties(
+case class AlterDatabasePropertiesCommand(
     databaseName: String,
     props: Map[String, String])
   extends RunnableCommand {
@@ -134,7 +134,7 @@ case class AlterDatabaseProperties(
  *    DESCRIBE DATABASE [EXTENDED] db_name
  * }}}
  */
-case class DescribeDatabase(
+case class DescribeDatabaseCommand(
     databaseName: String,
     extended: Boolean)
   extends RunnableCommand {
@@ -175,7 +175,7 @@ case class DescribeDatabase(
  *   DROP VIEW [IF EXISTS] [db_name.]view_name;
  * }}}
  */
-case class DropTable(
+case class DropTableCommand(
     tableName: TableIdentifier,
     ifExists: Boolean,
     isView: Boolean) extends RunnableCommand {
@@ -220,7 +220,7 @@ case class DropTable(
  *   ALTER VIEW view1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
  * }}}
  */
-case class AlterTableSetProperties(
+case class AlterTableSetPropertiesCommand(
     tableName: TableIdentifier,
     properties: Map[String, String],
     isView: Boolean)
@@ -251,7 +251,7 @@ case class AlterTableSetProperties(
  *   ALTER VIEW view1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
  * }}}
  */
-case class AlterTableUnsetProperties(
+case class AlterTableUnsetPropertiesCommand(
     tableName: TableIdentifier,
     propKeys: Seq[String],
     ifExists: Boolean,
@@ -291,7 +291,7 @@ case class AlterTableUnsetProperties(
  *   ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties;
  * }}}
  */
-case class AlterTableSerDeProperties(
+case class AlterTableSerDePropertiesCommand(
     tableName: TableIdentifier,
     serdeClassName: Option[String],
     serdeProperties: Option[Map[String, String]],
@@ -330,7 +330,7 @@ case class AlterTableSerDeProperties(
  *   ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
  * }}}
  */
-case class AlterTableAddPartition(
+case class AlterTableAddPartitionCommand(
     tableName: TableIdentifier,
     partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
     ifNotExists: Boolean)
@@ -361,7 +361,7 @@ case class AlterTableAddPartition(
  *   ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2;
  * }}}
  */
-case class AlterTableRenamePartition(
+case class AlterTableRenamePartitionCommand(
     tableName: TableIdentifier,
     oldPartition: TablePartitionSpec,
     newPartition: TablePartitionSpec)
@@ -389,7 +389,7 @@ case class AlterTableRenamePartition(
  *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
  * }}}
  */
-case class AlterTableDropPartition(
+case class AlterTableDropPartitionCommand(
     tableName: TableIdentifier,
     specs: Seq[TablePartitionSpec],
     ifExists: Boolean)
@@ -420,7 +420,7 @@ case class AlterTableDropPartition(
  *    ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION "loc";
  * }}}
  */
-case class AlterTableSetLocation(
+case class AlterTableSetLocationCommand(
     tableName: TableIdentifier,
     partitionSpec: Option[TablePartitionSpec],
     location: String)
@@ -459,7 +459,7 @@ case class AlterTableSetLocation(
 }
 
 
-private[sql] object DDLUtils {
+object DDLUtils {
 
   def isDatasourceTable(props: Map[String, String]): Boolean = {
     props.contains("spark.sql.sources.provider")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index 1ea9bc52999681a3c76602393291bbc990583897..d2d8e3ddeae26780a1b7804ab59664f868a31734 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
  *    AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
  * }}}
  */
-case class CreateFunction(
+case class CreateFunctionCommand(
     databaseName: Option[String],
     functionName: String,
     className: String,
@@ -81,7 +81,7 @@ case class CreateFunction(
  *   DESCRIBE FUNCTION [EXTENDED] upper;
  * }}}
  */
-case class DescribeFunction(
+case class DescribeFunctionCommand(
     functionName: FunctionIdentifier,
     isExtended: Boolean) extends RunnableCommand {
 
@@ -142,7 +142,7 @@ case class DescribeFunction(
  * ifExists: returns an error if the function doesn't exist, unless this is true.
  * isTemp: indicates if it is a temporary function.
  */
-case class DropFunction(
+case class DropFunctionCommand(
     databaseName: Option[String],
     functionName: String,
     ifExists: Boolean,
@@ -180,10 +180,10 @@ case class DropFunction(
  * For the pattern, '*' matches any sequence of characters (including no characters) and
  * '|' is for alternation.
  * For example, "show functions like 'yea*|windo*'" will return "window" and "year".
- *
- * TODO currently we are simply ignore the db
  */
-case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand {
+case class ShowFunctionsCommand(db: Option[String], pattern: Option[String])
+  extends RunnableCommand {
+
   override val output: Seq[Attribute] = {
     val schema = StructType(StructField("function", StringType, nullable = false) :: Nil)
     schema.toAttributes
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
index 29bcb30592516014ca33de9ec6a17df95b356712..162d493c1f8ce30c8c59840003218ba949948b6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 /**
  * Adds a jar to the current session so it can be used (for UDFs or serdes).
  */
-case class AddJar(path: String) extends RunnableCommand {
+case class AddJarCommand(path: String) extends RunnableCommand {
   override val output: Seq[Attribute] = {
     val schema = StructType(
       StructField("result", IntegerType, nullable = false) :: Nil)
@@ -40,7 +40,7 @@ case class AddJar(path: String) extends RunnableCommand {
 /**
  * Adds a file to the current session so it can be used.
  */
-case class AddFile(path: String) extends RunnableCommand {
+case class AddFileCommand(path: String) extends RunnableCommand {
   override def run(sparkSession: SparkSession): Seq[Row] = {
     sparkSession.sparkContext.addFile(path)
     Seq.empty[Row]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index d13492e55070b549ac8f2da74df5b21bf6203d06..13e63a1befb20b7066f2164b45ee5b3681ee45b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -39,9 +39,9 @@ import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 case class CreateTableAsSelectLogicalPlan(
-  tableDesc: CatalogTable,
-  child: LogicalPlan,
-  allowExisting: Boolean) extends UnaryNode with Command {
+    tableDesc: CatalogTable,
+    child: LogicalPlan,
+    allowExisting: Boolean) extends UnaryNode with Command {
 
   override def output: Seq[Attribute] = Seq.empty[Attribute]
 
@@ -63,7 +63,7 @@ case class CreateTableAsSelectLogicalPlan(
  *   LIKE [other_db_name.]existing_table_name
  * }}}
  */
-case class CreateTableLike(
+case class CreateTableLikeCommand(
     targetTable: TableIdentifier,
     sourceTable: TableIdentifier,
     ifNotExists: Boolean) extends RunnableCommand {
@@ -115,7 +115,7 @@ case class CreateTableLike(
  *   [AS select_statement];
  * }}}
  */
-case class CreateTable(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
+case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     sparkSession.sessionState.catalog.createTable(table, ifNotExists)
@@ -134,7 +134,7 @@ case class CreateTable(table: CatalogTable, ifNotExists: Boolean) extends Runnab
  *    ALTER VIEW view1 RENAME TO view2;
  * }}}
  */
-case class AlterTableRename(
+case class AlterTableRenameCommand(
     oldName: TableIdentifier,
     newName: TableIdentifier,
     isView: Boolean)
@@ -159,7 +159,7 @@ case class AlterTableRename(
  *  [PARTITION (partcol1=val1, partcol2=val2 ...)]
  * }}}
  */
-case class LoadData(
+case class LoadDataCommand(
     table: TableIdentifier,
     path: String,
     isLocal: Boolean,
@@ -281,7 +281,7 @@ case class LoadData(
  *  TRUNCATE TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
  * }}}
  */
-case class TruncateTable(
+case class TruncateTableCommand(
     tableName: TableIdentifier,
     partitionSpec: Option[TablePartitionSpec]) extends RunnableCommand {
 
@@ -444,16 +444,17 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
       append(buffer, "Sort Columns:", sortColumns.mkString("[", ", ", "]"), "")
     }
 
-    DDLUtils.getBucketSpecFromTableProperties(metadata).map { bucketSpec =>
-      appendBucketInfo(
-        bucketSpec.numBuckets,
-        bucketSpec.bucketColumnNames,
-        bucketSpec.sortColumnNames)
-    }.getOrElse {
-      appendBucketInfo(
-        metadata.numBuckets,
-        metadata.bucketColumnNames,
-        metadata.sortColumnNames)
+    DDLUtils.getBucketSpecFromTableProperties(metadata) match {
+      case Some(bucketSpec) =>
+        appendBucketInfo(
+          bucketSpec.numBuckets,
+          bucketSpec.bucketColumnNames,
+          bucketSpec.sortColumnNames)
+      case None =>
+        appendBucketInfo(
+          metadata.numBuckets,
+          metadata.bucketColumnNames,
+          metadata.sortColumnNames)
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 2e17b763a5370ad72e2da4b84ba0868ec2c86d4e..e5dd4d81d67797e7ed21c7a98208d55fb71a5be4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -433,7 +433,7 @@ case class DataSource(
         // ordering of data.logicalPlan (partition columns are all moved after data column).  This
         // will be adjusted within InsertIntoHadoopFsRelation.
         val plan =
-          InsertIntoHadoopFsRelation(
+          InsertIntoHadoopFsRelationCommand(
             outputPath,
             partitionColumns.map(UnresolvedAttribute.quoted),
             bucketSpec,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 0494fafb0e4248814262d1f695d40aad0f52de06..a3d87cd38bb8c4290da4c7606753df6fa96fc535 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -66,7 +66,7 @@ private[sql] object DataSourceAnalysis extends Rule[LogicalPlan] {
           "Cannot overwrite a path that is also being read from.")
       }
 
-      InsertIntoHadoopFsRelation(
+      InsertIntoHadoopFsRelationCommand(
         outputPath,
         t.partitionSchema.fields.map(_.name).map(UnresolvedAttribute(_)),
         t.bucketSpec,
@@ -153,7 +153,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
 
     case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
       part, query, overwrite, false) if part.isEmpty =>
-      ExecutedCommandExec(InsertIntoDataSource(l, query, overwrite)) :: Nil
+      ExecutedCommandExec(InsertIntoDataSourceCommand(l, query, overwrite)) :: Nil
 
     case _ => Nil
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
similarity index 97%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index 7b15e496414e38a28d3897158018f1c87d9c0587..c3e07f7d00557afe26a9ae070f4feae4f7b3fc03 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.sources.InsertableRelation
 /**
  * Inserts the results of `query` in to a relation that extends [[InsertableRelation]].
  */
-private[sql] case class InsertIntoDataSource(
+private[sql] case class InsertIntoDataSourceCommand(
     logicalRelation: LogicalRelation,
     query: LogicalPlan,
     overwrite: Boolean)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
similarity index 93%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 4921e4ca6bb7d183e3097dca4695e370569e87ab..1426dcf4697ff62e28572e74d1c26c831098f5c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -35,11 +35,11 @@ import org.apache.spark.sql.internal.SQLConf
 
 /**
  * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
- * Writing to dynamic partitions is also supported.  Each [[InsertIntoHadoopFsRelation]] issues a
- * single write job, and owns a UUID that identifies this job.  Each concrete implementation of
- * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
- * each task output file.  This UUID is passed to executor side via a property named
- * `spark.sql.sources.writeJobUUID`.
+ * Writing to dynamic partitions is also supported.  Each [[InsertIntoHadoopFsRelationCommand]]
+ * issues a single write job, and owns a UUID that identifies this job.  Each concrete
+ * implementation of [[HadoopFsRelation]] should use this UUID together with task id to generate
+ * unique file path for each task output file.  This UUID is passed to executor side via a
+ * property named `spark.sql.sources.writeJobUUID`.
  *
  * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
  * are used to write to normal tables and tables with dynamic partitions.
@@ -55,7 +55,7 @@ import org.apache.spark.sql.internal.SQLConf
  *   4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception is
  *      thrown during job commitment, also aborts the job.
  */
-private[sql] case class InsertIntoHadoopFsRelation(
+private[sql] case class InsertIntoHadoopFsRelationCommand(
     outputPath: Path,
     partitionColumns: Seq[Attribute],
     bucketSpec: Option[BucketSpec],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 78b1db16826e23d5698fa144451718eda26e715b..edbccde214c6178167e8aa8315ab85df08d915f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -91,7 +91,7 @@ case class CreateTempTableUsing(
   }
 }
 
-case class CreateTempTableUsingAsSelect(
+case class CreateTempTableUsingAsSelectCommand(
     tableIdent: TableIdentifier,
     provider: String,
     partitionColumns: Array[String],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 8f7c6f5d0ca44917ad96d48739e9e40641b561c4..939b9195cae998993f5df40fe2fc2bb51c32bf6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.AnalyzeTable
+import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource}
 import org.apache.spark.sql.util.ExecutionListenerManager
 
@@ -189,6 +189,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {
    * in the external catalog.
    */
   def analyze(tableName: String): Unit = {
-    AnalyzeTable(tableName).run(sparkSession)
+    AnalyzeTableCommand(tableName).run(sparkSession)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 0925a51310e695f3f88dbff663055a75ced12879..708b878c843a701154c45e8ae5dfcf99e4330b3a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -48,7 +48,7 @@ class DDLCommandSuite extends PlanTest {
        |WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
       """.stripMargin
     val parsed = parser.parsePlan(sql)
-    val expected = CreateDatabase(
+    val expected = CreateDatabaseCommand(
       "database_name",
       ifNotExists = true,
       Some("/home/user/db"),
@@ -82,19 +82,19 @@ class DDLCommandSuite extends PlanTest {
     val parsed6 = parser.parsePlan(sql6)
     val parsed7 = parser.parsePlan(sql7)
 
-    val expected1 = DropDatabase(
+    val expected1 = DropDatabaseCommand(
       "database_name",
       ifExists = true,
       cascade = false)
-    val expected2 = DropDatabase(
+    val expected2 = DropDatabaseCommand(
       "database_name",
       ifExists = true,
       cascade = true)
-    val expected3 = DropDatabase(
+    val expected3 = DropDatabaseCommand(
       "database_name",
       ifExists = false,
       cascade = false)
-    val expected4 = DropDatabase(
+    val expected4 = DropDatabaseCommand(
       "database_name",
       ifExists = false,
       cascade = true)
@@ -116,10 +116,10 @@ class DDLCommandSuite extends PlanTest {
     val parsed1 = parser.parsePlan(sql1)
     val parsed2 = parser.parsePlan(sql2)
 
-    val expected1 = AlterDatabaseProperties(
+    val expected1 = AlterDatabasePropertiesCommand(
       "database_name",
       Map("a" -> "a", "b" -> "b", "c" -> "c"))
-    val expected2 = AlterDatabaseProperties(
+    val expected2 = AlterDatabasePropertiesCommand(
       "database_name",
       Map("a" -> "a"))
 
@@ -141,10 +141,10 @@ class DDLCommandSuite extends PlanTest {
     val parsed1 = parser.parsePlan(sql1)
     val parsed2 = parser.parsePlan(sql2)
 
-    val expected1 = DescribeDatabase(
+    val expected1 = DescribeDatabaseCommand(
       "db_name",
       extended = true)
-    val expected2 = DescribeDatabase(
+    val expected2 = DescribeDatabaseCommand(
       "db_name",
       extended = false)
 
@@ -167,7 +167,7 @@ class DDLCommandSuite extends PlanTest {
       """.stripMargin
     val parsed1 = parser.parsePlan(sql1)
     val parsed2 = parser.parsePlan(sql2)
-    val expected1 = CreateFunction(
+    val expected1 = CreateFunctionCommand(
       None,
       "helloworld",
       "com.matthewrathbone.example.SimpleUDFExample",
@@ -175,7 +175,7 @@ class DDLCommandSuite extends PlanTest {
         FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar1"),
         FunctionResource(FunctionResourceType.fromString("jar"), "/path/to/jar2")),
       isTemp = true)
-    val expected2 = CreateFunction(
+    val expected2 = CreateFunctionCommand(
       Some("hello"),
       "world",
       "com.matthewrathbone.example.SimpleUDFExample",
@@ -198,22 +198,22 @@ class DDLCommandSuite extends PlanTest {
     val parsed3 = parser.parsePlan(sql3)
     val parsed4 = parser.parsePlan(sql4)
 
-    val expected1 = DropFunction(
+    val expected1 = DropFunctionCommand(
       None,
       "helloworld",
       ifExists = false,
       isTemp = true)
-    val expected2 = DropFunction(
+    val expected2 = DropFunctionCommand(
       None,
       "helloworld",
       ifExists = true,
       isTemp = true)
-    val expected3 = DropFunction(
+    val expected3 = DropFunctionCommand(
       Some("hello"),
       "world",
       ifExists = false,
       isTemp = false)
-    val expected4 = DropFunction(
+    val expected4 = DropFunctionCommand(
       Some("hello"),
       "world",
       ifExists = true,
@@ -231,11 +231,11 @@ class DDLCommandSuite extends PlanTest {
       containsThesePhrases = Seq("create external table", "location"))
     val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
     parser.parsePlan(query) match {
-      case ct: CreateTable =>
+      case ct: CreateTableCommand =>
         assert(ct.table.tableType == CatalogTableType.EXTERNAL)
         assert(ct.table.storage.locationUri == Some("/something/anything"))
       case other =>
-        fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
+        fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
           s"got ${other.getClass.getName}: $query")
     }
   }
@@ -253,11 +253,11 @@ class DDLCommandSuite extends PlanTest {
   test("create table - location implies external") {
     val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
     parser.parsePlan(query) match {
-      case ct: CreateTable =>
+      case ct: CreateTableCommand =>
         assert(ct.table.tableType == CatalogTableType.EXTERNAL)
         assert(ct.table.storage.locationUri == Some("/something/anything"))
       case other =>
-        fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
+        fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
             s"got ${other.getClass.getName}: $query")
     }
   }
@@ -282,7 +282,7 @@ class DDLCommandSuite extends PlanTest {
         assert(Seq("a") == ct.partitionColumns.toSeq)
         comparePlans(ct.copy(partitionColumns = null), expected)
       case other =>
-        fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
+        fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
           s"got ${other.getClass.getName}: $query")
     }
   }
@@ -308,7 +308,7 @@ class DDLCommandSuite extends PlanTest {
         assert(ct.partitionColumns.isEmpty)
         comparePlans(ct.copy(partitionColumns = null), expected)
       case other =>
-        fail(s"Expected to parse ${classOf[CreateTable].getClass.getName} from query," +
+        fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
           s"got ${other.getClass.getName}: $query")
     }
   }
@@ -320,11 +320,11 @@ class DDLCommandSuite extends PlanTest {
     val sql_view = sql_table.replace("TABLE", "VIEW")
     val parsed_table = parser.parsePlan(sql_table)
     val parsed_view = parser.parsePlan(sql_view)
-    val expected_table = AlterTableRename(
+    val expected_table = AlterTableRenameCommand(
       TableIdentifier("table_name", None),
       TableIdentifier("new_table_name", None),
       isView = false)
-    val expected_view = AlterTableRename(
+    val expected_view = AlterTableRenameCommand(
       TableIdentifier("table_name", None),
       TableIdentifier("new_table_name", None),
       isView = true)
@@ -353,11 +353,11 @@ class DDLCommandSuite extends PlanTest {
     val parsed3_view = parser.parsePlan(sql3_view)
 
     val tableIdent = TableIdentifier("table_name", None)
-    val expected1_table = AlterTableSetProperties(
+    val expected1_table = AlterTableSetPropertiesCommand(
       tableIdent, Map("test" -> "test", "comment" -> "new_comment"), isView = false)
-    val expected2_table = AlterTableUnsetProperties(
+    val expected2_table = AlterTableUnsetPropertiesCommand(
       tableIdent, Seq("comment", "test"), ifExists = false, isView = false)
-    val expected3_table = AlterTableUnsetProperties(
+    val expected3_table = AlterTableUnsetPropertiesCommand(
       tableIdent, Seq("comment", "test"), ifExists = true, isView = false)
     val expected1_view = expected1_table.copy(isView = true)
     val expected2_view = expected2_table.copy(isView = true)
@@ -412,21 +412,21 @@ class DDLCommandSuite extends PlanTest {
     val parsed4 = parser.parsePlan(sql4)
     val parsed5 = parser.parsePlan(sql5)
     val tableIdent = TableIdentifier("table_name", None)
-    val expected1 = AlterTableSerDeProperties(
+    val expected1 = AlterTableSerDePropertiesCommand(
       tableIdent, Some("org.apache.class"), None, None)
-    val expected2 = AlterTableSerDeProperties(
+    val expected2 = AlterTableSerDePropertiesCommand(
       tableIdent,
       Some("org.apache.class"),
       Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
       None)
-    val expected3 = AlterTableSerDeProperties(
+    val expected3 = AlterTableSerDePropertiesCommand(
       tableIdent, None, Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), None)
-    val expected4 = AlterTableSerDeProperties(
+    val expected4 = AlterTableSerDePropertiesCommand(
       tableIdent,
       Some("org.apache.class"),
       Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
       Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))
-    val expected5 = AlterTableSerDeProperties(
+    val expected5 = AlterTableSerDePropertiesCommand(
       tableIdent,
       None,
       Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
@@ -459,13 +459,13 @@ class DDLCommandSuite extends PlanTest {
     val parsed1 = parser.parsePlan(sql1)
     val parsed2 = parser.parsePlan(sql2)
 
-    val expected1 = AlterTableAddPartition(
+    val expected1 = AlterTableAddPartitionCommand(
       TableIdentifier("table_name", None),
       Seq(
         (Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
         (Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
       ifNotExists = true)
-    val expected2 = AlterTableAddPartition(
+    val expected2 = AlterTableAddPartitionCommand(
       TableIdentifier("table_name", None),
       Seq((Map("dt" -> "2008-08-08"), Some("loc"))),
       ifNotExists = false)
@@ -490,7 +490,7 @@ class DDLCommandSuite extends PlanTest {
        |RENAME TO PARTITION (dt='2008-09-09', country='uk')
       """.stripMargin
     val parsed = parser.parsePlan(sql)
-    val expected = AlterTableRenamePartition(
+    val expected = AlterTableRenamePartitionCommand(
       TableIdentifier("table_name", None),
       Map("dt" -> "2008-08-08", "country" -> "us"),
       Map("dt" -> "2008-09-09", "country" -> "uk"))
@@ -529,7 +529,7 @@ class DDLCommandSuite extends PlanTest {
     assertUnsupported(sql2_view)
 
     val tableIdent = TableIdentifier("table_name", None)
-    val expected1_table = AlterTableDropPartition(
+    val expected1_table = AlterTableDropPartitionCommand(
       tableIdent,
       Seq(
         Map("dt" -> "2008-08-08", "country" -> "us"),
@@ -565,11 +565,11 @@ class DDLCommandSuite extends PlanTest {
     val parsed1 = parser.parsePlan(sql1)
     val parsed2 = parser.parsePlan(sql2)
     val tableIdent = TableIdentifier("table_name", None)
-    val expected1 = AlterTableSetLocation(
+    val expected1 = AlterTableSetLocationCommand(
       tableIdent,
       None,
       "new location")
-    val expected2 = AlterTableSetLocation(
+    val expected2 = AlterTableSetLocationCommand(
       tableIdent,
       Some(Map("dt" -> "2008-08-08", "country" -> "us")),
       "new location")
@@ -676,13 +676,13 @@ class DDLCommandSuite extends PlanTest {
     assertUnsupported(s"DROP TABLE IF EXISTS $tableName2 PURGE")
 
     val expected1 =
-      DropTable(TableIdentifier("tab", Option("db")), ifExists = false, isView = false)
+      DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false)
     val expected2 =
-      DropTable(TableIdentifier("tab", Option("db")), ifExists = true, isView = false)
+      DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false)
     val expected3 =
-      DropTable(TableIdentifier("tab", None), ifExists = false, isView = false)
+      DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false)
     val expected4 =
-      DropTable(TableIdentifier("tab", None), ifExists = true, isView = false)
+      DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false)
 
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
@@ -700,13 +700,13 @@ class DDLCommandSuite extends PlanTest {
     val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2")
 
     val expected1 =
-      DropTable(TableIdentifier("view", Option("db")), ifExists = false, isView = true)
+      DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true)
     val expected2 =
-      DropTable(TableIdentifier("view", Option("db")), ifExists = true, isView = true)
+      DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true)
     val expected3 =
-      DropTable(TableIdentifier("view", None), ifExists = false, isView = true)
+      DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true)
     val expected4 =
-      DropTable(TableIdentifier("view", None), ifExists = true, isView = true)
+      DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true)
 
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 4c528fbbbeef7244ad31075d777aecbc7d28c26f..86ab15240262533496d58585e429f87f02ae6d56 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -485,7 +485,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 
           val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
 
-          execution.CreateTableAsSelect(
+          execution.CreateTableAsSelectCommand(
             desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
             child,
             allowExisting)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
similarity index 99%
rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
index 9dfbafae872f6c05ab6171b97a38c65e34738609..3fc900961e645bcfd92bbf1a75f7c00d3b3697fc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.hive.MetastoreRelation
  *                      raise exception
  */
 private[hive]
-case class CreateTableAsSelect(
+case class CreateTableAsSelectCommand(
     tableDesc: CatalogTable,
     query: LogicalPlan,
     allowExisting: Boolean)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 2d8b1f325ac32b8496c0cfe923cd48c8eec7b110..30ad392969b4e936b2d0fb7ce8c3328aa5d3eb0a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.JsonTuple
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
-import org.apache.spark.sql.execution.command.{CreateTable, CreateTableAsSelectLogicalPlan, CreateTableLike, CreateViewCommand, LoadData}
+import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.hive.test.TestHive
 
 class HiveDDLCommandSuite extends PlanTest {
@@ -37,7 +37,7 @@ class HiveDDLCommandSuite extends PlanTest {
 
   private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
     parser.parsePlan(sql).collect {
-      case CreateTable(desc, allowExisting) => (desc, allowExisting)
+      case CreateTableCommand(desc, allowExisting) => (desc, allowExisting)
       case CreateTableAsSelectLogicalPlan(desc, _, allowExisting) => (desc, allowExisting)
       case CreateViewCommand(desc, _, allowExisting, _, _, _) => (desc, allowExisting)
     }.head
@@ -555,7 +555,7 @@ class HiveDDLCommandSuite extends PlanTest {
   test("create table like") {
     val v1 = "CREATE TABLE table1 LIKE table2"
     val (target, source, exists) = parser.parsePlan(v1).collect {
-      case CreateTableLike(t, s, allowExisting) => (t, s, allowExisting)
+      case CreateTableLikeCommand(t, s, allowExisting) => (t, s, allowExisting)
     }.head
     assert(exists == false)
     assert(target.database.isEmpty)
@@ -565,7 +565,7 @@ class HiveDDLCommandSuite extends PlanTest {
 
     val v2 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2"
     val (target2, source2, exists2) = parser.parsePlan(v2).collect {
-      case CreateTableLike(t, s, allowExisting) => (t, s, allowExisting)
+      case CreateTableLikeCommand(t, s, allowExisting) => (t, s, allowExisting)
     }.head
     assert(exists2)
     assert(target2.database.isEmpty)
@@ -577,7 +577,7 @@ class HiveDDLCommandSuite extends PlanTest {
   test("load data") {
     val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1"
     val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect {
-      case LoadData(t, path, l, o, partition) => (t, path, l, o, partition)
+      case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition)
     }.head
     assert(table.database.isEmpty)
     assert(table.table == "table1")
@@ -588,7 +588,7 @@ class HiveDDLCommandSuite extends PlanTest {
 
     val v2 = "LOAD DATA LOCAL INPATH 'path' OVERWRITE INTO TABLE table1 PARTITION(c='1', d='2')"
     val (table2, path2, isLocal2, isOverwrite2, partition2) = parser.parsePlan(v2).collect {
-      case LoadData(t, path, l, o, partition) => (t, path, l, o, partition)
+      case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition)
     }.head
     assert(table2.database.isEmpty)
     assert(table2.table == "table1")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 7011cd81221c5666cdc5bc50ebd906f4a9bdfd5a..1a7b6c0112279a2cf401014e06f07cc4ef73dddf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -21,7 +21,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.AnalyzeTable
+import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -33,7 +33,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
     def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
       val parsed = hiveContext.parseSql(analyzeCommand)
       val operators = parsed.collect {
-        case a: AnalyzeTable => a
+        case a: AnalyzeTableCommand => a
         case o => o
       }
 
@@ -49,23 +49,23 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
 
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 COMPUTE STATISTICS",
-      classOf[AnalyzeTable])
+      classOf[AnalyzeTableCommand])
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
-      classOf[AnalyzeTable])
+      classOf[AnalyzeTableCommand])
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
-      classOf[AnalyzeTable])
+      classOf[AnalyzeTableCommand])
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS",
-      classOf[AnalyzeTable])
+      classOf[AnalyzeTableCommand])
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS noscan",
-      classOf[AnalyzeTable])
+      classOf[AnalyzeTableCommand])
 
     assertAnalyzeCommand(
       "ANALYZE TABLE Table1 COMPUTE STATISTICS nOscAn",
-      classOf[AnalyzeTable])
+      classOf[AnalyzeTableCommand])
   }
 
   ignore("analyze MetastoreRelations") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 7fe158c218461d98921a987f2c95f1ae1d680ccc..3e5140fe578ded1ec976933ef749d9aa71c6a269 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.command.ExecutedCommandExec
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
 import org.apache.spark.sql.hive.execution.HiveTableScanExec
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -308,10 +308,10 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
 
       val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
       df.queryExecution.sparkPlan match {
-        case ExecutedCommandExec(_: InsertIntoHadoopFsRelation) => // OK
+        case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
         case o => fail("test_insert_parquet should be converted to a " +
           s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
-          s"${classOf[InsertIntoDataSource].getCanonicalName} is expected as the SparkPlan. " +
+          s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " +
           s"However, found a ${o.toString} ")
       }
 
@@ -338,10 +338,10 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
 
       val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
       df.queryExecution.sparkPlan match {
-        case ExecutedCommandExec(_: InsertIntoHadoopFsRelation) => // OK
+        case ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand) => // OK
         case o => fail("test_insert_parquet should be converted to a " +
           s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
-          s"${classOf[InsertIntoDataSource].getCanonicalName} is expected as the SparkPlan." +
+          s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." +
           s"However, found a ${o.toString} ")
       }