From a5f02b00291e0a22429a3dca81f12cf6d38fea0b Mon Sep 17 00:00:00 2001 From: Wenchen Fan <wenchen@databricks.com> Date: Fri, 2 Dec 2016 12:54:12 +0800 Subject: [PATCH] [SPARK-18647][SQL] do not put provider in table properties for Hive serde table ## What changes were proposed in this pull request? In Spark 2.1, we make Hive serde tables case-preserving by putting the table metadata in table properties, like what we did for data source table. However, we should not put table provider, as it will break forward compatibility. e.g. if we create a Hive serde table with Spark 2.1, using `sql("create table test stored as parquet as select 1")`, we will fail to read it with Spark 2.0, as Spark 2.0 mistakenly treat it as data source table because there is a `provider` entry in table properties. Logically Hive serde table's provider is always hive, we don't need to store it in table properties, this PR removes it. ## How was this patch tested? manually test the forward compatibility issue. Author: Wenchen Fan <wenchen@databricks.com> Closes #16080 from cloud-fan/hive. --- .../spark/sql/hive/HiveExternalCatalog.scala | 80 ++++++++++--------- .../sql/hive/HiveExternalCatalogSuite.scala | 18 +++++ .../sql/hive/HiveMetastoreCatalogSuite.scala | 2 - 3 files changed, 59 insertions(+), 41 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 1a9943bc31..065883234a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -21,6 +21,7 @@ import java.io.IOException import java.net.URI import java.util +import scala.collection.mutable import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -219,9 +220,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // table location for tables in default database, while we expect to use the location of // default database. storage = tableDefinition.storage.copy(locationUri = tableLocation), - // Here we follow data source tables and put table metadata like provider, schema, etc. in - // table properties, so that we can work around the Hive metastore issue about not case - // preserving and make Hive serde table support mixed-case column names. + // Here we follow data source tables and put table metadata like table schema, partition + // columns etc. in table properties, so that we can work around the Hive metastore issue + // about not case preserving and make Hive serde table support mixed-case column names. properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition)) client.createTable(tableWithDataSourceProps, ignoreIfExists) } else { @@ -233,10 +234,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = { + // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. + val provider = table.provider.get + // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type // support, no column nullability, etc., we should do some extra works before saving table // metadata into Hive metastore: - // 1. Put table metadata like provider, schema, etc. in table properties. + // 1. Put table metadata like table schema, partition columns, etc. in table properties. // 2. Check if this table is hive compatible. // 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket // spec to empty and save table metadata to Hive. @@ -244,6 +248,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 val tableProperties = tableMetaToTableProps(table) + // put table provider and partition provider in table properties. + tableProperties.put(DATASOURCE_PROVIDER, provider) + if (table.tracksPartitionsInCatalog) { + tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG) + } + // Ideally we should also put `locationUri` in table properties like provider, schema, etc. // However, in older version of Spark we already store table location in storage properties // with key "path". Here we keep this behaviour for backward compatibility. @@ -290,7 +300,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } val qualifiedTableName = table.identifier.quotedString - val maybeSerde = HiveSerDe.sourceToSerDe(table.provider.get) + val maybeSerde = HiveSerDe.sourceToSerDe(provider) val skipHiveMetadata = table.storage.properties .getOrElse("skipHiveMetadata", "false").toBoolean @@ -315,7 +325,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat (Some(newHiveCompatibleMetastoreTable(serde)), message) case _ => - val provider = table.provider.get val message = s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + s"Persisting data source table $qualifiedTableName into Hive metastore in " + @@ -349,21 +358,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat /** * Data source tables may be non Hive compatible and we need to store table metadata in table * properties to workaround some Hive metastore limitations. - * This method puts table provider, partition provider, schema, partition column names, bucket - * specification into a map, which can be used as table properties later. + * This method puts table schema, partition column names, bucket specification into a map, which + * can be used as table properties later. */ - private def tableMetaToTableProps(table: CatalogTable): scala.collection.Map[String, String] = { - // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. - val provider = table.provider.get + private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = { val partitionColumns = table.partitionColumnNames val bucketSpec = table.bucketSpec - val properties = new scala.collection.mutable.HashMap[String, String] - properties.put(DATASOURCE_PROVIDER, provider) - if (table.tracksPartitionsInCatalog) { - properties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG) - } - + val properties = new mutable.HashMap[String, String] // Serialized JSON schema string may be too long to be stored into a single metastore table // property. In this case, we split the JSON string and store each part as a separate table // property. @@ -617,14 +619,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat if (table.tableType != VIEW) { table.properties.get(DATASOURCE_PROVIDER) match { - // No provider in table properties, which means this table is created by Spark prior to 2.1, - // or is created at Hive side. + // No provider in table properties, which means this is a Hive serde table. case None => - table = table.copy( - provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true) - - // This is a Hive serde table created by Spark 2.1 or higher versions. - case Some(DDLUtils.HIVE_PROVIDER) => table = restoreHiveSerdeTable(table) // This is a regular data source table. @@ -637,7 +633,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val statsProps = table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)) if (statsProps.nonEmpty) { - val colStats = new scala.collection.mutable.HashMap[String, ColumnStat] + val colStats = new mutable.HashMap[String, ColumnStat] // For each column, recover its column stats. Note that this is currently a O(n^2) operation, // but given the number of columns it usually not enormous, this is probably OK as a start. @@ -674,21 +670,27 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true) - val schemaFromTableProps = getSchemaFromTableProperties(table) - if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) { - hiveTable.copy( - schema = schemaFromTableProps, - partitionColumnNames = getPartitionColumnsFromTableProperties(table), - bucketSpec = getBucketSpecFromTableProperties(table)) + // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its + // schema from table properties. + if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) { + val schemaFromTableProps = getSchemaFromTableProperties(table) + if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) { + hiveTable.copy( + schema = schemaFromTableProps, + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table)) + } else { + // Hive metastore may change the table schema, e.g. schema inference. If the table + // schema we read back is different(ignore case and nullability) from the one in table + // properties which was written when creating table, we should respect the table schema + // from hive. + logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " + + "different from the schema when this table was created by Spark SQL" + + s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " + + "from Hive metastore which is not case preserving.") + hiveTable + } } else { - // Hive metastore may change the table schema, e.g. schema inference. If the table - // schema we read back is different(ignore case and nullability) from the one in table - // properties which was written when creating table, we should respect the table schema - // from hive. - logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " + - "different from the schema when this table was created by Spark SQL" + - s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema from " + - "Hive metastore which is not case preserving.") hiveTable } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index efa0beb850..6fee45824e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -20,8 +20,11 @@ package org.apache.spark.sql.hive import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.types.StructType /** * Test suite for the [[HiveExternalCatalog]]. @@ -52,4 +55,19 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { assert(selectedPartitions.length == 1) assert(selectedPartitions.head.spec == part1.spec) } + + test("SPARK-18647: do not put provider in table properties for Hive serde table") { + val catalog = newBasicCatalog() + val hiveTable = CatalogTable( + identifier = TableIdentifier("hive_tbl", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = storageFormat, + schema = new StructType().add("col1", "int").add("col2", "string"), + provider = Some("hive")) + catalog.createTable(hiveTable, ignoreIfExists = false) + + val rawTable = externalCatalog.client.getTable("db1", "hive_tbl") + assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER)) + assert(externalCatalog.getTable("db1", "hive_tbl").provider == Some(DDLUtils.HIVE_PROVIDER)) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 7abc4d9623..0a280b4952 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.hive -import java.io.File - import org.apache.spark.sql.{QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTableType -- GitLab