diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 701b73a4aa39b6e1275da21d4f9bb8606e4868c9..ff82c7f7af6f88a7a446443b205ad3a11b6250e2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -376,41 +376,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } } - - /** - * Creates any tables required for query execution. - * For example, because of a CREATE TABLE X AS statement. - */ - object CreateTables extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - // Wait until children are resolved. - case p: LogicalPlan if !p.childrenResolved => p - - case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => - val newTableDesc = if (tableDesc.storage.serde.isEmpty) { - // add default serde - tableDesc.withNewStorage( - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - } else { - tableDesc - } - - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc) - - // Currently we will never hit this branch, as SQL string API can only use `Ignore` or - // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde - // tables yet. - if (mode == SaveMode.Append || mode == SaveMode.Overwrite) { - throw new AnalysisException("" + - "CTAS for hive serde tables does not support append or overwrite semantics.") - } - - execution.CreateHiveTableAsSelectCommand( - newTableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))), - query, - mode == SaveMode.Ignore) - } - } } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index ca8c7347f23e91909681b278ab8212079666d258..86d3b6de0dbfd3455f6ab8e9e73c64feb2fa2646 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -87,7 +87,6 @@ private[sql] class HiveSessionCatalog( val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions - val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables override def refreshTable(name: TableIdentifier): Unit = { super.refreshTable(name) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index a7cc7cc142e4ed2f3ca8a998aa6e7d27a49116b2..f3c4135da6552eb7ea6f046f13695c9ce30b488e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -61,7 +61,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override val extendedResolutionRules = catalog.ParquetConversions :: catalog.OrcConversions :: - catalog.CreateTables :: PreprocessDDL(conf) :: PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 17956ded1796d6e3edd0005b2cd1544a491bbd32..fb11c849edd94a7708b553f57ab210f717e362ca 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -23,6 +23,8 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ private[hive] trait HiveStrategies { @@ -45,6 +47,31 @@ private[hive] trait HiveStrategies { case logical.InsertIntoTable( table: MetastoreRelation, partition, child, overwrite, ifNotExists) => InsertIntoHiveTable(table, partition, planLater(child), overwrite, ifNotExists) :: Nil + + case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" => + val newTableDesc = if (tableDesc.storage.serde.isEmpty) { + // add default serde + tableDesc.withNewStorage( + serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + } else { + tableDesc + } + + // Currently we will never hit this branch, as SQL string API can only use `Ignore` or + // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde + // tables yet. + if (mode == SaveMode.Append || mode == SaveMode.Overwrite) { + throw new AnalysisException("" + + "CTAS for hive serde tables does not support append or overwrite semantics.") + } + + val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase) + val cmd = CreateHiveTableAsSelectCommand( + newTableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))), + query, + mode == SaveMode.Ignore) + ExecutedCommandExec(cmd) :: Nil + case _ => Nil } }