From 28ab17922a227e8d93654d3478c0d493bfb599d5 Mon Sep 17 00:00:00 2001
From: Wenchen Fan <wenchen@databricks.com>
Date: Fri, 26 Aug 2016 08:52:10 -0700
Subject: [PATCH] [SPARK-17260][MINOR] move CreateTables to HiveStrategies

## What changes were proposed in this pull request?

`CreateTables` rule turns a general `CreateTable` plan to `CreateHiveTableAsSelectCommand` for hive serde table. However, this rule is logically a planner strategy, we should move it to `HiveStrategies`, to be consistent with other DDL commands.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #14825 from cloud-fan/ctas.
---
 .../spark/sql/hive/HiveMetastoreCatalog.scala | 35 -------------------
 .../spark/sql/hive/HiveSessionCatalog.scala   |  1 -
 .../spark/sql/hive/HiveSessionState.scala     |  1 -
 .../spark/sql/hive/HiveStrategies.scala       | 27 ++++++++++++++
 4 files changed, 27 insertions(+), 37 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 701b73a4aa..ff82c7f7af 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -376,41 +376,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       }
     }
   }
-
-  /**
-   * Creates any tables required for query execution.
-   * For example, because of a CREATE TABLE X AS statement.
-   */
-  object CreateTables extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-      // Wait until children are resolved.
-      case p: LogicalPlan if !p.childrenResolved => p
-
-      case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
-        val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
-          // add default serde
-          tableDesc.withNewStorage(
-            serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
-        } else {
-          tableDesc
-        }
-
-        val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc)
-
-        // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
-        // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
-        // tables yet.
-        if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
-          throw new AnalysisException("" +
-            "CTAS for hive serde tables does not support append or overwrite semantics.")
-        }
-
-        execution.CreateHiveTableAsSelectCommand(
-          newTableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
-          query,
-          mode == SaveMode.Ignore)
-    }
-  }
 }
 
 /**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index ca8c7347f2..86d3b6de0d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -87,7 +87,6 @@ private[sql] class HiveSessionCatalog(
 
   val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
   val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
-  val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
 
   override def refreshTable(name: TableIdentifier): Unit = {
     super.refreshTable(name)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index a7cc7cc142..f3c4135da6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -61,7 +61,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
       override val extendedResolutionRules =
         catalog.ParquetConversions ::
         catalog.OrcConversions ::
-        catalog.CreateTables ::
         PreprocessDDL(conf) ::
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 17956ded17..fb11c849ed 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.hive.execution._
 
 private[hive] trait HiveStrategies {
@@ -45,6 +47,31 @@ private[hive] trait HiveStrategies {
       case logical.InsertIntoTable(
           table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
         InsertIntoHiveTable(table, partition, planLater(child), overwrite, ifNotExists) :: Nil
+
+      case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
+        val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
+          // add default serde
+          tableDesc.withNewStorage(
+            serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+        } else {
+          tableDesc
+        }
+
+        // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
+        // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
+        // tables yet.
+        if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
+          throw new AnalysisException("" +
+            "CTAS for hive serde tables does not support append or overwrite semantics.")
+        }
+
+        val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+        val cmd = CreateHiveTableAsSelectCommand(
+          newTableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
+          query,
+          mode == SaveMode.Ignore)
+        ExecutedCommandExec(cmd) :: Nil
+
       case _ => Nil
     }
   }
-- 
GitLab