Skip to content
Snippets Groups Projects
Commit 27a1a5c9 authored by Eric Liang's avatar Eric Liang Committed by Reynold Xin
Browse files

[SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location


## What changes were proposed in this pull request?

We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location.

## How was this patch tested?

Unit test that fails before the patch.

Author: Eric Liang <ekl@databricks.com>

Closes #15983 from ericl/spark-18544.

(cherry picked from commit e2318ede)
Signed-off-by: default avatarReynold Xin <rxin@databricks.com>
parent 1759cf69
No related branches found
No related tags found
No related merge requests found
......@@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(s"Table $tableIdent already exists.")
case _ =>
val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
val tableType = if (storage.locationUri.isDefined) {
val existingTable = if (tableExists) {
Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent))
} else {
None
}
val storage = if (tableExists) {
existingTable.get.storage
} else {
DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
}
val tableType = if (tableExists) {
existingTable.get.tableType
} else if (storage.locationUri.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
......@@ -391,12 +402,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
)
df.sparkSession.sessionState.executePlan(
CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
if (tableDesc.partitionColumnNames.nonEmpty &&
df.sparkSession.sqlContext.conf.manageFilesourcePartitions) {
// Need to recover partitions into the metastore so our saved data is visible.
df.sparkSession.sessionState.executePlan(
AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd
}
}
}
......
......@@ -212,7 +212,8 @@ case class CreateDataSourceTableAsSelectCommand(
className = provider,
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption)
options = table.storage.properties ++ pathOption,
catalogTable = Some(table))
val result = try {
dataSource.write(mode, df)
......
......@@ -188,6 +188,25 @@ class PartitionProviderCompatibilitySuite
}
}
for (enabled <- Seq(true, false)) {
test(s"SPARK-18544 append with saveAsTable - partition management $enabled") {
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) {
withTable("test") {
withTempDir { dir =>
setupPartitionedDatasourceTable("test", dir)
if (enabled) {
spark.sql("msck repair table test")
}
assert(spark.sql("select * from test").count() == 5)
spark.range(10).selectExpr("id as fieldOne", "id as partCol")
.write.partitionBy("partCol").mode("append").saveAsTable("test")
assert(spark.sql("select * from test").count() == 15)
}
}
}
}
}
/**
* Runs a test against a multi-level partitioned table, then validates that the custom locations
* were respected by the output writer.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment