From aff53021cf828cd7c139d8ec230d45593078b73a Mon Sep 17 00:00:00 2001
From: Wenchen Fan <wenchen@databricks.com>
Date: Tue, 7 Feb 2017 00:36:57 +0800
Subject: [PATCH] [SPARK-19080][SQL] simplify data source analysis

## What changes were proposed in this pull request?

The current way of resolving `InsertIntoTable` and `CreateTable` is convoluted: sometimes we replace them with concrete implementation commands during analysis, sometimes during planning phase.

And the error checking logic is also a mess: we may put it in extended analyzer rules, or extended checking rules, or `CheckAnalysis`.

This PR simplifies the data source analysis:

1.  `InsertIntoTable` and `CreateTable` are always unresolved and need to be replaced by concrete implementation commands during analysis.
2. The error checking logic is mainly in 2 rules: `PreprocessTableCreation` and `PreprocessTableInsertion`.

## How was this patch tested?

existing test.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16269 from cloud-fan/ddl.
---
 .../sql/catalyst/analysis/CheckAnalysis.scala |  26 +---
 .../UnsupportedOperationChecker.scala         |   3 -
 .../plans/logical/basicLogicalOperators.scala |  16 +--
 .../apache/spark/sql/DataFrameWriter.scala    |   2 +-
 .../spark/sql/execution/SparkPlanner.scala    |   1 -
 .../spark/sql/execution/SparkStrategies.scala |  28 ----
 .../spark/sql/execution/command/tables.scala  |   6 +-
 .../datasources/DataSourceStrategy.scala      |  18 ++-
 .../spark/sql/execution/datasources/ddl.scala |  18 ++-
 .../sql/execution/datasources/rules.scala     | 130 +++++++-----------
 .../spark/sql/internal/SessionState.scala     |   5 +-
 .../sql/execution/command/DDLSuite.scala      |   4 +-
 .../spark/sql/sources/InsertSuite.scala       |   2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala |  24 ++--
 .../spark/sql/hive/HiveSessionCatalog.scala   |   2 +
 .../spark/sql/hive/HiveSessionState.scala     |   9 +-
 .../spark/sql/hive/HiveStrategies.scala       |  10 +-
 .../spark/sql/hive/HiveDDLCommandSuite.scala  |   2 +-
 .../sql/hive/InsertIntoHiveTableSuite.scala   |   3 +-
 .../apache/spark/sql/hive/parquetSuites.scala |  16 +--
 20 files changed, 126 insertions(+), 199 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index f13a1f6d5d..2a3d3a173c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -376,28 +376,6 @@ trait CheckAnalysis extends PredicateHelper {
                  |Conflicting attributes: ${conflictingAttributes.mkString(",")}
                """.stripMargin)
 
-          case InsertIntoTable(t, _, _, _, _)
-            if !t.isInstanceOf[LeafNode] ||
-              t.isInstanceOf[Range] ||
-              t == OneRowRelation ||
-              t.isInstanceOf[LocalRelation] =>
-            failAnalysis(s"Inserting into an RDD-based table is not allowed.")
-
-          case i @ InsertIntoTable(table, partitions, query, _, _) =>
-            val numStaticPartitions = partitions.values.count(_.isDefined)
-            if (table.output.size != (query.output.size + numStaticPartitions)) {
-              failAnalysis(
-                s"$table requires that the data to be inserted have the same number of " +
-                  s"columns as the target table: target table has ${table.output.size} " +
-                  s"column(s) but the inserted data has " +
-                  s"${query.output.size + numStaticPartitions} column(s), including " +
-                  s"$numStaticPartitions partition column(s) having constant value(s).")
-            }
-
-          case o if !o.resolved =>
-            failAnalysis(
-              s"unresolved operator ${operator.simpleString}")
-
           case o if o.expressions.exists(!_.deterministic) &&
             !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
             !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] =>
@@ -413,6 +391,10 @@ trait CheckAnalysis extends PredicateHelper {
         }
     }
     extendedCheckRules.foreach(_(plan))
+    plan.foreachUp {
+      case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
+      case _ =>
+    }
 
     plan.foreach(_.setAnalyzed())
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index f4d016cb96..e4fd737b35 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -111,9 +111,6 @@ object UnsupportedOperationChecker {
           throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
             "streaming DataFrames/Datasets")
 
-        case _: InsertIntoTable =>
-          throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets")
-
         case Join(left, right, joinType, _) =>
 
           joinType match {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 432097d621..8d7a6bc4b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -363,7 +363,8 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
 }
 
 /**
- * Insert some data into a table.
+ * Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
+ * concrete implementations during analysis.
  *
  * @param table the logical plan representing the table. In the future this should be a
  *              [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables
@@ -374,25 +375,24 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
  *                  Map('a' -> Some('1'), 'b' -> Some('2')),
  *                  and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
  *                  would have Map('a' -> Some('1'), 'b' -> None).
- * @param child the logical plan representing data to write to.
+ * @param query the logical plan representing data to write to.
  * @param overwrite overwrite existing table or partitions.
  * @param ifNotExists If true, only write if the table or partition does not exist.
  */
 case class InsertIntoTable(
     table: LogicalPlan,
     partition: Map[String, Option[String]],
-    child: LogicalPlan,
+    query: LogicalPlan,
     overwrite: Boolean,
     ifNotExists: Boolean)
   extends LogicalPlan {
-
-  override def children: Seq[LogicalPlan] = child :: Nil
-  override def output: Seq[Attribute] = Seq.empty
-
   assert(overwrite || !ifNotExists)
   assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
 
-  override lazy val resolved: Boolean = childrenResolved && table.resolved
+  // We don't want `table` in children as sometimes we don't want to transform it.
+  override def children: Seq[LogicalPlan] = query :: Nil
+  override def output: Seq[Attribute] = Seq.empty
+  override lazy val resolved: Boolean = false
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index ff1f0177e8..81657d9e47 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -265,7 +265,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       InsertIntoTable(
         table = UnresolvedRelation(tableIdent),
         partition = Map.empty[String, Option[String]],
-        child = df.logicalPlan,
+        query = df.logicalPlan,
         overwrite = mode == SaveMode.Overwrite,
         ifNotExists = false)).toRdd
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index 73e2ffdf00..678241656c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -36,7 +36,6 @@ class SparkPlanner(
       extraStrategies ++ (
       FileSourceStrategy ::
       DataSourceStrategy ::
-      DDLStrategy ::
       SpecialLimits ::
       Aggregation ::
       JoinSelection ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index fafb919670..e3ec343479 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -405,32 +405,4 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case _ => Nil
     }
   }
-
-  object DDLStrategy extends Strategy {
-    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
-        val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore)
-        ExecutedCommandExec(cmd) :: Nil
-
-      case CreateTable(tableDesc, mode, None) =>
-        val cmd =
-          CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
-        ExecutedCommandExec(cmd) :: Nil
-
-      // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
-      // `CreateTables`
-
-      case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isDatasourceTable(tableDesc) =>
-        val cmd =
-          CreateDataSourceTableAsSelectCommand(
-            tableDesc,
-            mode,
-            query)
-        ExecutedCommandExec(cmd) :: Nil
-
-      case c: CreateTempViewUsing => ExecutedCommandExec(c) :: Nil
-
-      case _ => Nil
-    }
-  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index bb903a2662..bc4b5b6258 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -111,10 +111,12 @@ case class CreateTableLikeCommand(
  *   [AS select_statement];
  * }}}
  */
-case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
+case class CreateTableCommand(
+    table: CatalogTable,
+    ignoreIfExists: Boolean) extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    sparkSession.sessionState.catalog.createTable(table, ifNotExists)
+    sparkSession.sessionState.catalog.createTable(table, ignoreIfExists)
     Seq.empty[Row]
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 19db293132..d8a5158287 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -45,7 +45,8 @@ import org.apache.spark.unsafe.types.UTF8String
  * Replaces generic operations with specific variants that are designed to work with Spark
  * SQL Data Sources.
  *
- * Note that, this rule must be run after [[PreprocessTableInsertion]].
+ * Note that, this rule must be run after `PreprocessTableCreation` and
+ * `PreprocessTableInsertion`.
  */
 case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
 
@@ -130,6 +131,17 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case CreateTable(tableDesc, mode, None) if DDLUtils.isDatasourceTable(tableDesc) =>
+      CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
+
+    case CreateTable(tableDesc, mode, Some(query))
+        if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
+      CreateDataSourceTableAsSelectCommand(tableDesc, mode, query)
+
+    case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _),
+        parts, query, overwrite, false) if parts.isEmpty =>
+      InsertIntoDataSourceCommand(l, query, overwrite)
+
     case InsertIntoTable(
         l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) =>
       // If the InsertIntoTable command is for a partitioned HadoopFsRelation and
@@ -273,10 +285,6 @@ object DataSourceStrategy extends Strategy with Logging {
         Map.empty,
         None) :: Nil
 
-    case InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
-      part, query, overwrite, false) if part.isEmpty =>
-      ExecutedCommandExec(InsertIntoDataSourceCommand(l, query, overwrite)) :: Nil
-
     case _ => Nil
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index d10fa2c9ff..110d503f91 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -20,15 +20,23 @@ package org.apache.spark.sql.execution.datasources
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
 import org.apache.spark.sql.types._
 
+/**
+ * Create a table and optionally insert some data into it. Note that this plan is unresolved and
+ * has to be replaced by the concrete implementations during analysis.
+ *
+ * @param tableDesc the metadata of the table to be created.
+ * @param mode the data writing mode
+ * @param query an optional logical plan representing data to write into the created table.
+ */
 case class CreateTable(
     tableDesc: CatalogTable,
     mode: SaveMode,
-    query: Option[LogicalPlan]) extends Command {
+    query: Option[LogicalPlan]) extends LogicalPlan {
   assert(tableDesc.provider.isDefined, "The table to be created must have a provider.")
 
   if (query.isEmpty) {
@@ -37,7 +45,9 @@ case class CreateTable(
       "create table without data insertion can only use ErrorIfExists or Ignore as SaveMode.")
   }
 
-  override def innerChildren: Seq[QueryPlan[_]] = query.toSeq
+  override def children: Seq[LogicalPlan] = query.toSeq
+  override def output: Seq[Attribute] = Seq.empty
+  override lazy val resolved: Boolean = false
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 623d47b4c9..e053a0e9e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -21,12 +21,11 @@ import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering}
-import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
+import org.apache.spark.sql.sources.InsertableRelation
 import org.apache.spark.sql.types.{AtomicType, StructType}
 
 /**
@@ -65,10 +64,10 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {
 }
 
 /**
- * Analyze [[CreateTable]] and do some normalization and checking.
- * For CREATE TABLE AS SELECT, the SELECT query is also analyzed.
+ * Preprocess [[CreateTable]], to do some normalization and checking.
  */
-case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+  private val catalog = sparkSession.sessionState.catalog
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     // When we CREATE TABLE without specifying the table schema, we should fail the query if
@@ -91,16 +90,10 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
     // bucket spec, etc. match the existing table, and adjust the columns order of the given query
     // if necessary.
     case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))
-        if sparkSession.sessionState.catalog.tableExists(tableDesc.identifier) =>
+        if query.resolved && catalog.tableExists(tableDesc.identifier) =>
       // This is guaranteed by the parser and `DataFrameWriter`
       assert(tableDesc.provider.isDefined)
 
-      // Analyze the query in CTAS and then we can do the normalization and checking.
-      val qe = sparkSession.sessionState.executePlan(query)
-      qe.assertAnalyzed()
-      val analyzedQuery = qe.analyzed
-
-      val catalog = sparkSession.sessionState.catalog
       val db = tableDesc.identifier.database.getOrElse(catalog.getCurrentDatabase)
       val tableIdentWithDB = tableDesc.identifier.copy(database = Some(db))
       val tableName = tableIdentWithDB.unquotedString
@@ -121,7 +114,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
           s"`${specifiedProvider.getSimpleName}`.")
       }
 
-      if (analyzedQuery.schema.length != existingTable.schema.length) {
+      if (query.schema.length != existingTable.schema.length) {
         throw new AnalysisException(
           s"The column number of the existing table $tableName" +
             s"(${existingTable.schema.catalogString}) doesn't match the data schema" +
@@ -135,8 +128,8 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
       // adjust the column order of the given dataframe according to it, or throw exception
       // if the column names do not match.
       val adjustedColumns = tableCols.map { col =>
-        analyzedQuery.resolve(Seq(col), resolver).getOrElse {
-          val inputColumns = analyzedQuery.schema.map(_.name).mkString(", ")
+        query.resolve(Seq(col), resolver).getOrElse {
+          val inputColumns = query.schema.map(_.name).mkString(", ")
           throw new AnalysisException(
             s"cannot resolve '$col' given input columns: [$inputColumns]")
         }
@@ -172,10 +165,10 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
           """.stripMargin)
       }
 
-      val newQuery = if (adjustedColumns != analyzedQuery.output) {
-        Project(adjustedColumns, analyzedQuery)
+      val newQuery = if (adjustedColumns != query.output) {
+        Project(adjustedColumns, query)
       } else {
-        analyzedQuery
+        query
       }
 
       c.copy(
@@ -191,15 +184,12 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
     //   * partition columns' type must be AtomicType.
     //   * sort columns' type must be orderable.
     //   * reorder table schema or output of query plan, to put partition columns at the end.
-    case c @ CreateTable(tableDesc, _, query) =>
+    case c @ CreateTable(tableDesc, _, query) if query.forall(_.resolved) =>
       if (query.isDefined) {
         assert(tableDesc.schema.isEmpty,
           "Schema may not be specified in a Create Table As Select (CTAS) statement")
 
-        val qe = sparkSession.sessionState.executePlan(query.get)
-        qe.assertAnalyzed()
-        val analyzedQuery = qe.analyzed
-
+        val analyzedQuery = query.get
         val normalizedTable = normalizeCatalogTable(analyzedQuery.schema, tableDesc)
 
         val output = analyzedQuery.output
@@ -319,16 +309,15 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
     val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec(
       insert.partition, partColNames, tblName, conf.resolver)
 
-    val expectedColumns = {
-      val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
-      insert.table.output.filterNot(a => staticPartCols.contains(a.name))
-    }
+    val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
+    val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name))
 
-    if (expectedColumns.length != insert.child.schema.length) {
+    if (expectedColumns.length != insert.query.schema.length) {
       throw new AnalysisException(
-        s"Cannot insert into table $tblName because the number of columns are different: " +
-          s"need ${expectedColumns.length} columns, " +
-          s"but query has ${insert.child.schema.length} columns.")
+        s"$tblName requires that the data to be inserted have the same number of columns as the " +
+          s"target table: target table has ${insert.table.output.size} column(s) but the " +
+          s"inserted data has ${insert.query.output.length + staticPartCols.size} column(s), " +
+          s"including ${staticPartCols.size} partition column(s) having constant value(s).")
     }
 
     if (normalizedPartSpec.nonEmpty) {
@@ -353,7 +342,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
   private def castAndRenameChildOutput(
       insert: InsertIntoTable,
       expectedOutput: Seq[Attribute]): InsertIntoTable = {
-    val newChildOutput = expectedOutput.zip(insert.child.output).map {
+    val newChildOutput = expectedOutput.zip(insert.query.output).map {
       case (expected, actual) =>
         if (expected.dataType.sameType(actual.dataType) &&
             expected.name == actual.name &&
@@ -368,15 +357,15 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
         }
     }
 
-    if (newChildOutput == insert.child.output) {
+    if (newChildOutput == insert.query.output) {
       insert
     } else {
-      insert.copy(child = Project(newChildOutput, insert.child))
+      insert.copy(query = Project(newChildOutput, insert.query))
     }
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case i @ InsertIntoTable(table, _, child, _, _) if table.resolved && child.resolved =>
+    case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
       table match {
         case relation: CatalogRelation =>
           val metadata = relation.catalogTable
@@ -387,7 +376,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
         case LogicalRelation(_: InsertableRelation, _, catalogTable) =>
           val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
           preprocess(i, tblName, Nil)
-        case other => i
+        case _ => i
       }
   }
 }
@@ -398,10 +387,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
 object HiveOnlyCheck extends (LogicalPlan => Unit) {
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
-      case CreateTable(tableDesc, _, Some(_)) if DDLUtils.isHiveTable(tableDesc) =>
-        throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT")
       case CreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) =>
-        throw new AnalysisException("Hive support is required to CREATE Hive TABLE")
+        throw new AnalysisException("Hive support is required to CREATE Hive TABLE (AS SELECT)")
       case _ => // OK
     }
   }
@@ -410,63 +397,40 @@ object HiveOnlyCheck extends (LogicalPlan => Unit) {
 /**
  * A rule to do various checks before inserting into or writing to a data source table.
  */
-case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
-  extends (LogicalPlan => Unit) {
+object PreWriteCheck extends (LogicalPlan => Unit) {
 
   def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
 
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
-      case logical.InsertIntoTable(
-          l @ LogicalRelation(t: InsertableRelation, _, _), partition, query, _, _) =>
-        // Right now, we do not support insert into a data source table with partition specs.
-        if (partition.nonEmpty) {
-          failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
-        } else {
-          // Get all input data source relations of the query.
-          val srcRelations = query.collect {
-            case LogicalRelation(src: BaseRelation, _, _) => src
-          }
-          if (srcRelations.contains(t)) {
-            failAnalysis(
-              "Cannot insert overwrite into table that is also being read from.")
-          } else {
-            // OK
-          }
+      case InsertIntoTable(l @ LogicalRelation(relation, _, _), partition, query, _, _) =>
+        // Get all input data source relations of the query.
+        val srcRelations = query.collect {
+          case LogicalRelation(src, _, _) => src
         }
-
-      case logical.InsertIntoTable(
-        LogicalRelation(r: HadoopFsRelation, _, _), part, query, _, _) =>
-        // We need to make sure the partition columns specified by users do match partition
-        // columns of the relation.
-        val existingPartitionColumns = r.partitionSchema.fieldNames.toSet
-        val specifiedPartitionColumns = part.keySet
-        if (existingPartitionColumns != specifiedPartitionColumns) {
-          failAnalysis("Specified partition columns " +
-            s"(${specifiedPartitionColumns.mkString(", ")}) " +
-            "do not match the partition columns of the table. Please use " +
-            s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.")
+        if (srcRelations.contains(relation)) {
+          failAnalysis("Cannot insert into table that is also being read from.")
         } else {
           // OK
         }
 
-        PartitioningUtils.validatePartitionColumn(
-          r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis)
+        relation match {
+          case _: HadoopFsRelation => // OK
 
-        // Get all input data source relations of the query.
-        val srcRelations = query.collect {
-          case LogicalRelation(src: BaseRelation, _, _) => src
-        }
-        if (srcRelations.contains(r)) {
-          failAnalysis(
-            "Cannot insert overwrite into table that is also being read from.")
-        } else {
-          // OK
+          // Right now, we do not support insert into a non-file-based data source table with
+          // partition specs.
+          case _: InsertableRelation if partition.nonEmpty =>
+            failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
+
+          case _ => failAnalysis(s"$relation does not allow insertion.")
         }
 
-      case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
-        // The relation in l is not an InsertableRelation.
-        failAnalysis(s"$l does not allow insertion.")
+      case InsertIntoTable(t, _, _, _, _)
+        if !t.isInstanceOf[LeafNode] ||
+          t.isInstanceOf[Range] ||
+          t == OneRowRelation ||
+          t.isInstanceOf[LocalRelation] =>
+        failAnalysis(s"Inserting into an RDD-based table is not allowed.")
 
       case _ => // OK
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index a5ebe4780f..6908560511 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -118,12 +118,11 @@ private[sql] class SessionState(sparkSession: SparkSession) {
         new ResolveSQLOnFile(sparkSession) :: Nil
 
       override val postHocResolutionRules =
-        AnalyzeCreateTable(sparkSession) ::
+        PreprocessTableCreation(sparkSession) ::
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) :: Nil
 
-      override val extendedCheckRules =
-        Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck)
+      override val extendedCheckRules = Seq(PreWriteCheck, HiveOnlyCheck)
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index f6d1ee2287..bcb707c8fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1555,13 +1555,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       var e = intercept[AnalysisException] {
         sql("CREATE TABLE t SELECT 1 as a, 1 as b")
       }.getMessage
-      assert(e.contains("Hive support is required to use CREATE Hive TABLE AS SELECT"))
+      assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
 
       spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
       e = intercept[AnalysisException] {
         sql("CREATE TABLE t SELECT a, b from t1")
       }.getMessage
-      assert(e.contains("Hive support is required to use CREATE Hive TABLE AS SELECT"))
+      assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 13284ba649..5b215ca07f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -113,7 +113,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
         |INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt
       """.stripMargin)
     }.getMessage
-    assert(message.contains("the number of columns are different")
+    assert(message.contains("target table has 2 column(s) but the inserted data has 1 column(s)")
     )
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index faa76b73fd..677da0dbdc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -247,16 +247,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     }
 
     override def apply(plan: LogicalPlan): LogicalPlan = {
-      if (!plan.resolved || plan.analyzed) {
-        return plan
-      }
-
       plan transformUp {
         // Write path
-        case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
-          // Inserting into partitioned table is not supported in Parquet data source (yet).
-          if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
-          InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists)
+        case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists)
+            // Inserting into partitioned table is not supported in Parquet data source (yet).
+            if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) =>
+          InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists)
 
         // Read path
         case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) =>
@@ -285,16 +281,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     }
 
     override def apply(plan: LogicalPlan): LogicalPlan = {
-      if (!plan.resolved || plan.analyzed) {
-        return plan
-      }
-
       plan transformUp {
         // Write path
-        case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
-          // Inserting into partitioned table is not supported in Orc data source (yet).
-          if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
-          InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists)
+        case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists)
+            // Inserting into partitioned table is not supported in Orc data source (yet).
+            if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) =>
+          InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists)
 
         // Read path
         case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 44ef5cce2e..c9be1b9d10 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -68,6 +68,8 @@ private[sql] class HiveSessionCatalog(
   // and HiveCatalog. We should still do it at some point...
   private val metastoreCatalog = new HiveMetastoreCatalog(sparkSession)
 
+  // These 2 rules must be run before all other DDL post-hoc resolution rules, i.e.
+  // `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
   val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
   val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 413712e0c6..273cf85df3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -60,20 +60,20 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
   override lazy val analyzer: Analyzer = {
     new Analyzer(catalog, conf) {
       override val extendedResolutionRules =
-        catalog.ParquetConversions ::
-        catalog.OrcConversions ::
         new ResolveHiveSerdeTable(sparkSession) ::
         new FindDataSourceTable(sparkSession) ::
         new FindHiveSerdeTable(sparkSession) ::
         new ResolveSQLOnFile(sparkSession) :: Nil
 
       override val postHocResolutionRules =
-        AnalyzeCreateTable(sparkSession) ::
+        catalog.ParquetConversions ::
+        catalog.OrcConversions ::
+        PreprocessTableCreation(sparkSession) ::
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::
         HiveAnalysis :: Nil
 
-      override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
+      override val extendedCheckRules = Seq(PreWriteCheck)
     }
   }
 
@@ -89,7 +89,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
         experimentalMethods.extraStrategies ++ Seq(
           FileSourceStrategy,
           DataSourceStrategy,
-          DDLStrategy,
           SpecialLimits,
           InMemoryScans,
           HiveTableScans,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 0f293c21fa..f45532cc38 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion}
+import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
+import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.internal.HiveSerDe
 
@@ -109,13 +109,17 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
 /**
  * Replaces generic operations with specific variants that are designed to work with Hive.
  *
- * Note that, this rule must be run after `PreprocessTableInsertion`.
+ * Note that, this rule must be run after `PreprocessTableCreation` and
+ * `PreprocessTableInsertion`.
  */
 object HiveAnalysis extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) =>
       InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
 
+    case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
+      CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
+
     case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
       CreateHiveTableAsSelectCommand(tableDesc, query, mode)
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 6f43b83607..0bd08877a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -52,7 +52,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle
 
   private def analyzeCreateTable(sql: String): CatalogTable = {
     TestHive.sessionState.analyzer.execute(parser.parsePlan(sql)).collect {
-      case CreateTable(tableDesc, mode, _) => tableDesc
+      case CreateTableCommand(tableDesc, _) => tableDesc
     }.head
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index e3ddaf7254..71ce5a7c4a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -376,7 +376,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
       val e = intercept[AnalysisException] {
         sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3")
       }
-      assert(e.message.contains("the number of columns are different"))
+      assert(e.message.contains(
+        "target table has 4 column(s) but the inserted data has 5 column(s)"))
   }
 
   testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index e9239ea56f..1a1b2571b6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -307,13 +307,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
         """.stripMargin)
 
       val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
-      df.queryExecution.sparkPlan match {
-        case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
+      df.queryExecution.analyzed match {
+        case cmd: InsertIntoHadoopFsRelationCommand =>
           assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
         case o => fail("test_insert_parquet should be converted to a " +
-          s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
-          s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan. " +
-          s"However, found a ${o.toString} ")
+          s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}")
       }
 
       checkAnswer(
@@ -338,13 +336,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
         """.stripMargin)
 
       val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
-      df.queryExecution.sparkPlan match {
-        case ExecutedCommandExec(cmd: InsertIntoHadoopFsRelationCommand) =>
+      df.queryExecution.analyzed match {
+        case cmd: InsertIntoHadoopFsRelationCommand =>
           assert(cmd.catalogTable.map(_.identifier.table) === Some("test_insert_parquet"))
         case o => fail("test_insert_parquet should be converted to a " +
-          s"${classOf[HadoopFsRelation ].getCanonicalName} and " +
-          s"${classOf[InsertIntoDataSourceCommand].getCanonicalName} should have been SparkPlan." +
-          s"However, found a ${o.toString} ")
+          s"${classOf[HadoopFsRelation ].getCanonicalName}. However, found a ${o.toString}")
       }
 
       checkAnswer(
-- 
GitLab