From a150e6c1b03b64a35855b8074b2fe077a6081a34 Mon Sep 17 00:00:00 2001 From: Wenchen Fan <wenchen@databricks.com> Date: Mon, 26 Oct 2015 21:14:26 -0700 Subject: [PATCH] [SPARK-10562] [SQL] support mixed case partitionBy column names for tables stored in metastore https://issues.apache.org/jira/browse/SPARK-10562 Author: Wenchen Fan <wenchen@databricks.com> Closes #9226 from cloud-fan/par. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 61 +++++++++++-------- .../sql/hive/MetastoreDataSourcesSuite.scala | 9 ++- .../sql/hive/execution/SQLQuerySuite.scala | 11 ++++ 3 files changed, 54 insertions(+), 27 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index fdb576bedb..f4d45714fa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -143,6 +143,21 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } } + def partColsFromParts: Option[Seq[String]] = { + table.properties.get("spark.sql.sources.schema.numPartCols").map { numPartCols => + (0 until numPartCols.toInt).map { index => + val partCol = table.properties.get(s"spark.sql.sources.schema.partCol.$index").orNull + if (partCol == null) { + throw new AnalysisException( + "Could not read partitioned columns from the metastore because it is corrupted " + + s"(missing part $index of the it, $numPartCols parts are expected).") + } + + partCol + } + } + } + // Originally, we used spark.sql.sources.schema to store the schema of a data source table. // After SPARK-6024, we removed this flag. // Although we are not using spark.sql.sources.schema any more, we need to still support. @@ -155,7 +170,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // We only need names at here since userSpecifiedSchema we loaded from the metastore // contains partition columns. We can always get datatypes of partitioning columns // from userSpecifiedSchema. - val partitionColumns = table.partitionColumns.map(_.name) + val partitionColumns = partColsFromParts.getOrElse(Nil) // It does not appear that the ql client for the metastore has a way to enumerate all the // SerDe properties directly... @@ -218,25 +233,21 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } } - val metastorePartitionColumns = userSpecifiedSchema.map { schema => - val fields = partitionColumns.map(col => schema(col)) - fields.map { field => - HiveColumn( - name = field.name, - hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType), - comment = "") - }.toSeq - }.getOrElse { - if (partitionColumns.length > 0) { - // The table does not have a specified schema, which means that the schema will be inferred - // when we load the table. So, we are not expecting partition columns and we will discover - // partitions when we load the table. However, if there are specified partition columns, - // we simply ignore them and provide a warning message. - logWarning( - s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " + - s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.") + if (userSpecifiedSchema.isDefined && partitionColumns.length > 0) { + tableProperties.put("spark.sql.sources.schema.numPartCols", partitionColumns.length.toString) + partitionColumns.zipWithIndex.foreach { case (partCol, index) => + tableProperties.put(s"spark.sql.sources.schema.partCol.$index", partCol) } - Seq.empty[HiveColumn] + } + + if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) { + // The table does not have a specified schema, which means that the schema will be inferred + // when we load the table. So, we are not expecting partition columns and we will discover + // partitions when we load the table. However, if there are specified partition columns, + // we simply ignore them and provide a warning message. + logWarning( + s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " + + s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.") } val tableType = if (isExternal) { @@ -255,8 +266,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive HiveTable( specifiedDatabase = Option(dbName), name = tblName, - schema = Seq.empty, - partitionColumns = metastorePartitionColumns, + schema = Nil, + partitionColumns = Nil, tableType = tableType, properties = tableProperties.toMap, serdeProperties = options) @@ -272,14 +283,14 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } } - val partitionColumns = schemaToHiveColumn(relation.partitionColumns) - val dataColumns = schemaToHiveColumn(relation.schema).filterNot(partitionColumns.contains) + assert(partitionColumns.isEmpty) + assert(relation.partitionColumns.isEmpty) HiveTable( specifiedDatabase = Option(dbName), name = tblName, - schema = dataColumns, - partitionColumns = partitionColumns, + schema = schemaToHiveColumn(relation.schema), + partitionColumns = Nil, tableType = tableType, properties = tableProperties.toMap, serdeProperties = options, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d292887688..f74eb1500b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -753,10 +753,15 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv invalidateTable(tableName) val metastoreTable = catalog.client.getTable("default", tableName) val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) + + val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt + assert(numPartCols == 2) + val actualPartitionColumns = StructType( - metastoreTable.partitionColumns.map(c => - StructField(c.name, HiveMetastoreTypes.toDataType(c.hiveType)))) + (0 until numPartCols).map { index => + df.schema(metastoreTable.properties(s"spark.sql.sources.schema.partCol.$index")) + }) // Make sure partition columns are correctly stored in metastore. assert( expectedPartitionColumns.sameType(actualPartitionColumns), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 396150be76..fd380641dc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1410,4 +1410,15 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("SPARK-10562: partition by column with mixed case name") { + withTable("tbl10562") { + val df = Seq(2012 -> "a").toDF("Year", "val") + df.write.partitionBy("Year").saveAsTable("tbl10562") + checkAnswer(sql("SELECT Year FROM tbl10562"), Row(2012)) + checkAnswer(sql("SELECT yEAr FROM tbl10562"), Row(2012)) + checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year > 2015"), Nil) + checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year == 2012"), Row("a")) + } + } } -- GitLab