Skip to content
Snippets Groups Projects
Commit 28ab1792 authored by Wenchen Fan's avatar Wenchen Fan Committed by Yin Huai
Browse files

[SPARK-17260][MINOR] move CreateTables to HiveStrategies

## What changes were proposed in this pull request?

`CreateTables` rule turns a general `CreateTable` plan to `CreateHiveTableAsSelectCommand` for hive serde table. However, this rule is logically a planner strategy, we should move it to `HiveStrategies`, to be consistent with other DDL commands.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #14825 from cloud-fan/ctas.
parent 6063d596
No related branches found
No related tags found
No related merge requests found
......@@ -376,41 +376,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}
}
/**
* Creates any tables required for query execution.
* For example, because of a CREATE TABLE X AS statement.
*/
object CreateTables extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
// add default serde
tableDesc.withNewStorage(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
tableDesc
}
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc)
// Currently we will never hit this branch, as SQL string API can only use `Ignore` or
// `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
// tables yet.
if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
throw new AnalysisException("" +
"CTAS for hive serde tables does not support append or overwrite semantics.")
}
execution.CreateHiveTableAsSelectCommand(
newTableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
query,
mode == SaveMode.Ignore)
}
}
}
/**
......
......@@ -87,7 +87,6 @@ private[sql] class HiveSessionCatalog(
val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
override def refreshTable(name: TableIdentifier): Unit = {
super.refreshTable(name)
......
......@@ -61,7 +61,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.OrcConversions ::
catalog.CreateTables ::
PreprocessDDL(conf) ::
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) ::
......
......@@ -23,6 +23,8 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
private[hive] trait HiveStrategies {
......@@ -45,6 +47,31 @@ private[hive] trait HiveStrategies {
case logical.InsertIntoTable(
table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite, ifNotExists) :: Nil
case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
// add default serde
tableDesc.withNewStorage(
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
} else {
tableDesc
}
// Currently we will never hit this branch, as SQL string API can only use `Ignore` or
// `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
// tables yet.
if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
throw new AnalysisException("" +
"CTAS for hive serde tables does not support append or overwrite semantics.")
}
val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
val cmd = CreateHiveTableAsSelectCommand(
newTableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
query,
mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil
case _ => Nil
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment