From fd4ba3f626f49d7d616a2a334d45b1c736e1db1c Mon Sep 17 00:00:00 2001
From: gatorsmile <gatorsmile@gmail.com>
Date: Fri, 26 Aug 2016 11:13:38 -0700
Subject: [PATCH] [SPARK-17192][SQL] Issue Exception when Users Specify the
 Partitioning Columns without a Given Schema

### What changes were proposed in this pull request?
Address the comments by yhuai in the original PR: https://github.com/apache/spark/pull/14207

First, issue an exception instead of logging a warning when users specify the partitioning columns without a given schema.

Second, refactor the codes a little.

### How was this patch tested?
Fixed the test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #14572 from gatorsmile/followup16552.
---
 .../sql/execution/datasources/rules.scala     | 25 ++++++-------------
 .../sql/execution/command/DDLSuite.scala      | 17 +++++++++----
 .../spark/sql/hive/HiveExternalCatalog.scala  | 16 ++++++------
 3 files changed, 29 insertions(+), 29 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 5eb2f0a9ff..f14c63c19f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -72,29 +72,20 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     // When we CREATE TABLE without specifying the table schema, we should fail the query if
-    // bucketing information is specified, as we can't infer bucketing from data files currently,
-    // and we should ignore the partition columns if it's specified, as we will infer it later, at
-    // runtime.
+    // bucketing information is specified, as we can't infer bucketing from data files currently.
+    // Since the runtime inferred partition columns could be different from what user specified,
+    // we fail the query if the partitioning information is specified.
     case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
       if (tableDesc.bucketSpec.isDefined) {
         failAnalysis("Cannot specify bucketing information if the table schema is not specified " +
           "when creating and will be inferred at runtime")
       }
-
-      val partitionColumnNames = tableDesc.partitionColumnNames
-      if (partitionColumnNames.nonEmpty) {
-        // The table does not have a specified schema, which means that the schema will be inferred
-        // at runtime. So, we are not expecting partition columns and we will discover partitions
-        // at runtime. However, if there are specified partition columns, we simply ignore them and
-        // provide a warning message.
-        logWarning(
-          s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " +
-            s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " +
-            "be inferred.")
-        c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
-      } else {
-        c
+      if (tableDesc.partitionColumnNames.nonEmpty) {
+        failAnalysis("It is not allowed to specify partition columns when the table schema is " +
+          "not defined. When the table schema is not provided, schema and partition columns " +
+          "will be inferred.")
       }
+      c
 
     // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
     // config, and do various checks:
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index e6ae42258d..b343454b12 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -265,7 +265,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         userSpecifiedPartitionCols.map(p => s"PARTITIONED BY ($p)").getOrElse("")
       val schemaClause = userSpecifiedSchema.map(s => s"($s)").getOrElse("")
       val uri = path.toURI
-      sql(
+      val sqlCreateTable =
         s"""
            |CREATE TABLE $tabName $schemaClause
            |USING parquet
@@ -273,11 +273,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
            |  path '$uri'
            |)
            |$partitionClause
-         """.stripMargin)
-      val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
+         """.stripMargin
+      if (userSpecifiedSchema.isEmpty && userSpecifiedPartitionCols.nonEmpty) {
+        val e = intercept[AnalysisException](sql(sqlCreateTable)).getMessage
+        assert(e.contains(
+          "not allowed to specify partition columns when the table schema is not defined"))
+      } else {
+        sql(sqlCreateTable)
+        val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName))
 
-      assert(expectedSchema == tableMetadata.schema)
-      assert(expectedPartitionCols == tableMetadata.partitionColumnNames)
+        assert(expectedSchema == tableMetadata.schema)
+        assert(expectedPartitionCols == tableMetadata.partitionColumnNames)
+      }
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 2586d11a6c..7f50e38d30 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -622,24 +622,26 @@ object HiveExternalCatalog {
   def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
     val errorMessage = "Could not read schema from the hive metastore because it is corrupted."
     val props = metadata.properties
-    props.get(DATASOURCE_SCHEMA).map { schema =>
+    val schema = props.get(DATASOURCE_SCHEMA)
+    if (schema.isDefined) {
       // Originally, we used `spark.sql.sources.schema` to store the schema of a data source table.
       // After SPARK-6024, we removed this flag.
       // Although we are not using `spark.sql.sources.schema` any more, we need to still support.
-      DataType.fromJson(schema).asInstanceOf[StructType]
-    } getOrElse {
-      props.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts =>
-        val parts = (0 until numParts.toInt).map { index =>
+      DataType.fromJson(schema.get).asInstanceOf[StructType]
+    } else {
+      val numSchemaParts = props.get(DATASOURCE_SCHEMA_NUMPARTS)
+      if (numSchemaParts.isDefined) {
+        val parts = (0 until numSchemaParts.get.toInt).map { index =>
           val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
           if (part == null) {
             throw new AnalysisException(errorMessage +
-              s" (missing part $index of the schema, $numParts parts are expected).")
+              s" (missing part $index of the schema, ${numSchemaParts.get} parts are expected).")
           }
           part
         }
         // Stick all parts back to a single schema string.
         DataType.fromJson(parts.mkString).asInstanceOf[StructType]
-      } getOrElse {
+      } else {
         throw new AnalysisException(errorMessage)
       }
     }
-- 
GitLab