diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 7c3bec897956a96a28af722b851c68ff652aee66..34748a04859adcf19d38948ecd6f1bc6b1be0172 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -138,8 +138,9 @@ case class BucketSpec( * Can be None if this table is a View, should be "hive" for hive serde tables. * @param unsupportedFeatures is a list of string descriptions of features that are used by the * underlying table but not supported by Spark SQL yet. - * @param partitionProviderIsHive whether this table's partition metadata is stored in the Hive - * metastore. + * @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the + * catalog. If false, it is inferred automatically based on file + * structure. */ case class CatalogTable( identifier: TableIdentifier, @@ -158,7 +159,7 @@ case class CatalogTable( viewText: Option[String] = None, comment: Option[String] = None, unsupportedFeatures: Seq[String] = Seq.empty, - partitionProviderIsHive: Boolean = false) { + tracksPartitionsInCatalog: Boolean = false) { /** schema of this table's partition columns */ def partitionSchema: StructType = StructType(schema.filter { @@ -217,7 +218,7 @@ case class CatalogTable( if (properties.nonEmpty) s"Properties: $tableProperties" else "", if (stats.isDefined) s"Statistics: ${stats.get.simpleString}" else "", s"$storage", - if (partitionProviderIsHive) "Partition Provider: Hive" else "") + if (tracksPartitionsInCatalog) "Partition Provider: Catalog" else "") output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")") } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 3eff12f9eed14e92761c5ccf097f1eaac3d7de6d..af1eaa1f237465b077ed50cac22c9589e13f2461 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -489,7 +489,7 @@ class TreeNodeSuite extends SparkFunSuite { "owner" -> "", "createTime" -> 0, "lastAccessTime" -> -1, - "partitionProviderIsHive" -> false, + "tracksPartitionsInCatalog" -> false, "properties" -> JNull, "unsupportedFeatures" -> List.empty[String])) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index d4b28274cc4539325299d892a6d20b47aeca13d7..7e16e43f2bb0ef6a30a76ab591ccafc06186e65f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -92,7 +92,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo // If metastore partition management for file source tables is enabled, we start off with // partition provider hive, but no partitions in the metastore. The user has to call // `msck repair table` to populate the table partitions. - partitionProviderIsHive = partitionColumnNames.nonEmpty && + tracksPartitionsInCatalog = partitionColumnNames.nonEmpty && sparkSession.sessionState.conf.manageFilesourcePartitions) // We will return Nil or throw exception at the beginning if the table already exists, so when // we reach here, the table should not exist and we should set `ignoreIfExists` to false. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 52af915b0be654a9871c33a8c33ac6e98d1a98be..b4d3ca1f3707426f69018114fa80e3ab755a3882 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -523,7 +523,7 @@ case class AlterTableRecoverPartitionsCommand( // Updates the table to indicate that its partition metadata is stored in the Hive metastore. // This is always the case for Hive format tables, but is not true for Datasource tables created // before Spark 2.1 unless they are converted via `msck repair table`. - spark.sessionState.catalog.alterTable(table.copy(partitionProviderIsHive = true)) + spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true)) catalog.refreshTable(tableName) logInfo(s"Recovered all partitions ($total).") Seq.empty[Row] @@ -702,7 +702,7 @@ object DDLUtils { s"$action is not allowed on $tableName since filesource partition management is " + "disabled (spark.sql.hive.manageFilesourcePartitions = false).") } - if (!table.partitionProviderIsHive && isDatasourceTable(table)) { + if (!table.tracksPartitionsInCatalog && isDatasourceTable(table)) { throw new AnalysisException( s"$action is not allowed on $tableName since its partition metadata is not stored in " + "the Hive metastore. To import this information into the metastore, run " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index f32c956f5999e9813472f3c2719bf19e7e472520..00c646b9185b36e2f615d8bb090d136fb5a6efd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -467,7 +467,7 @@ case class DescribeTableCommand( if (table.tableType == CatalogTableType.VIEW) describeViewInfo(table, buffer) - if (DDLUtils.isDatasourceTable(table) && table.partitionProviderIsHive) { + if (DDLUtils.isDatasourceTable(table) && table.tracksPartitionsInCatalog) { append(buffer, "Partition Provider:", "Hive", "") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 0b50448a7af1834384577fc609adced15e11bebb..52666119351b1399abdd83337912e7fdc2adacb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -308,7 +308,7 @@ case class DataSource( } val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && - catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) { + catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { new CatalogFileIndex( sparkSession, catalogTable.get, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e87998fe4ad8d1523c5046c987a0feaf9e852f48..a548e88cb683a25cad1331a55c0494e781519e43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -182,9 +182,10 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { "Cannot overwrite a path that is also being read from.") } - val overwritingSinglePartition = (overwrite.specificPartition.isDefined && + val overwritingSinglePartition = + overwrite.specificPartition.isDefined && t.sparkSession.sessionState.conf.manageFilesourcePartitions && - l.catalogTable.get.partitionProviderIsHive) + l.catalogTable.get.tracksPartitionsInCatalog val effectiveOutputPath = if (overwritingSinglePartition) { val partition = t.sparkSession.sessionState.catalog.getPartition( @@ -203,7 +204,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = { if (l.catalogTable.isDefined && updatedPartitions.nonEmpty && l.catalogTable.get.partitionColumnNames.nonEmpty && - l.catalogTable.get.partitionProviderIsHive) { + l.catalogTable.get.tracksPartitionsInCatalog) { val metastoreUpdater = AlterTableAddPartitionCommand( l.catalogTable.get.identifier, updatedPartitions.map(p => (p, None)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 927c0c5b95a17693399d4d3e0810045cf20251a1..9c75e2ae747618c3d22112f96aeee47e00010819 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -31,11 +31,7 @@ import org.apache.spark.sql.execution.command.RunnableCommand /** * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. - * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelationCommand]] - * issues a single write job, and owns a UUID that identifies this job. Each concrete - * implementation of [[HadoopFsRelation]] should use this UUID together with task id to generate - * unique file path for each task output file. This UUID is passed to executor side via a - * property named `spark.sql.sources.writeJobUUID`. + * Writing to dynamic partitions is also supported. */ case class InsertIntoHadoopFsRelationCommand( outputPath: Path, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d4d001497deb27e6a51ab96803b916d9b3b4c2e4..52b09c54464e72df69ca82930ab27b09e6925d91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -96,7 +96,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { provider = Some("hive"), partitionColumnNames = Seq("a", "b"), createTime = 0L, - partitionProviderIsHive = true) + tracksPartitionsInCatalog = true) } private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ebba203ac593cc0426341d0d3e7955741385a037..64ba52672b1c8ab22556c1ccc08a029b76a08a51 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -323,8 +323,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val properties = new scala.collection.mutable.HashMap[String, String] properties.put(DATASOURCE_PROVIDER, provider) - if (table.partitionProviderIsHive) { - properties.put(TABLE_PARTITION_PROVIDER, "hive") + if (table.tracksPartitionsInCatalog) { + properties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG) } // Serialized JSON schema string may be too long to be stored into a single metastore table @@ -489,10 +489,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat updateLocationInStorageProps(oldTableDef, newLocation).copy(locationUri = newLocation) } - val partitionProviderProp = if (tableDefinition.partitionProviderIsHive) { - TABLE_PARTITION_PROVIDER -> "hive" + val partitionProviderProp = if (tableDefinition.tracksPartitionsInCatalog) { + TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_CATALOG } else { - TABLE_PARTITION_PROVIDER -> "builtin" + TABLE_PARTITION_PROVIDER -> TABLE_PARTITION_PROVIDER_FILESYSTEM } // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, @@ -537,7 +537,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table } else { getProviderFromTableProperties(table).map { provider => - assert(provider != "hive", "Hive serde table should not save provider in table properties.") + assert(provider != TABLE_PARTITION_PROVIDER_CATALOG, + "Hive serde table should not save provider in table properties.") // Internally we store the table location in storage properties with key "path" for data // source tables. Here we set the table location to `locationUri` field and filter out the // path option in storage properties, to avoid exposing this concept externally. @@ -545,6 +546,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val tableLocation = getLocationFromStorageProps(table) updateLocationInStorageProps(table, None).copy(locationUri = tableLocation) } + val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) table.copy( storage = storageWithLocation, @@ -552,9 +554,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat provider = Some(provider), partitionColumnNames = getPartitionColumnsFromTableProperties(table), bucketSpec = getBucketSpecFromTableProperties(table), - partitionProviderIsHive = table.properties.get(TABLE_PARTITION_PROVIDER) == Some("hive")) + tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG) + ) } getOrElse { - table.copy(provider = Some("hive"), partitionProviderIsHive = true) + table.copy(provider = Some("hive"), tracksPartitionsInCatalog = true) } } @@ -851,6 +854,8 @@ object HiveExternalCatalog { val STATISTICS_COL_STATS_PREFIX = STATISTICS_PREFIX + "colStats." val TABLE_PARTITION_PROVIDER = SPARK_SQL_PREFIX + "partitionProvider" + val TABLE_PARTITION_PROVIDER_CATALOG = "catalog" + val TABLE_PARTITION_PROVIDER_FILESYSTEM = "filesystem" def getProviderFromTableProperties(metadata: CatalogTable): Option[String] = {