Skip to content
Snippets Groups Projects
Commit ce3b98ba authored by Sean Zhong's avatar Sean Zhong Committed by Yin Huai
Browse files

[SPARK-16034][SQL] Checks the partition columns when calling...

[SPARK-16034][SQL] Checks the partition columns when calling dataFrame.write.mode("append").saveAsTable

## What changes were proposed in this pull request?

`DataFrameWriter` can be used to append data to existing data source tables. It becomes tricky when partition columns used in `DataFrameWriter.partitionBy(columns)` don't match the actual partition columns of the underlying table. This pull request enforces the check so that the partition columns of these two always match.

## How was this patch tested?

Unit test.

Author: Sean Zhong <seanzhong@databricks.com>

Closes #13749 from clockfly/SPARK-16034.
parent 3d010c83
No related branches found
No related tags found
No related merge requests found
...@@ -242,8 +242,13 @@ case class CreateDataSourceTableAsSelectCommand( ...@@ -242,8 +242,13 @@ case class CreateDataSourceTableAsSelectCommand(
bucketSpec = bucketSpec, bucketSpec = bucketSpec,
options = optionsWithPath) options = optionsWithPath)
val result = dataSource.write(mode, df) val result = try {
dataSource.write(mode, df)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex)
throw ex
}
if (createMetastoreTable) { if (createMetastoreTable) {
// We will use the schema of resolved.relation as the schema of the table (instead of // We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation // the schema of df). It is important since the nullability may be changed by the relation
......
...@@ -435,26 +435,25 @@ case class DataSource( ...@@ -435,26 +435,25 @@ case class DataSource(
// If we are appending to a table that already exists, make sure the partitioning matches // If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check. // up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) { if (mode == SaveMode.Append) {
val existingPartitionColumnSet = try { val existingColumns = Try {
Some( resolveRelation()
resolveRelation() .asInstanceOf[HadoopFsRelation]
.asInstanceOf[HadoopFsRelation] .location
.location .partitionSpec()
.partitionSpec() .partitionColumns
.partitionColumns .fieldNames
.fieldNames .toSeq
.toSet) }.getOrElse(Seq.empty[String])
} catch { val sameColumns =
case e: Exception => existingColumns.map(_.toLowerCase) == partitionColumns.map(_.toLowerCase)
None if (existingColumns.size > 0 && !sameColumns) {
} throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
existingPartitionColumnSet.foreach { ex => |Existing partitioning columns:
if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) { | ${existingColumns.mkString(", ")}
throw new AnalysisException( |Requested partitioning columns:
s"Requested partitioning does not equal existing partitioning: " + | ${partitionColumns.mkString(", ")}
s"$ex != ${partitionColumns.toSet}.") |""".stripMargin)
}
} }
} }
......
...@@ -1317,4 +1317,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { ...@@ -1317,4 +1317,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)") assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
} }
test("SPARK-16034 Partition columns should match when appending to existing data source tables") {
import testImplicits._
val df = Seq((1, 2, 3)).toDF("a", "b", "c")
withTable("partitionedTable") {
df.write.mode("overwrite").partitionBy("a", "b").saveAsTable("partitionedTable")
// Misses some partition columns
intercept[AnalysisException] {
df.write.mode("append").partitionBy("a").saveAsTable("partitionedTable")
}
// Wrong order
intercept[AnalysisException] {
df.write.mode("append").partitionBy("b", "a").saveAsTable("partitionedTable")
}
// Partition columns not specified
intercept[AnalysisException] {
df.write.mode("append").saveAsTable("partitionedTable")
}
assert(sql("select * from partitionedTable").collect().size == 1)
// Inserts new data successfully when partition columns are correctly specified in
// partitionBy(...).
df.write.mode("append").partitionBy("a", "b").saveAsTable("partitionedTable")
assert(sql("select * from partitionedTable").collect().size == 2)
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment