Skip to content
Snippets Groups Projects
Commit d8b50f70 authored by Wenchen Fan's avatar Wenchen Fan Committed by Yin Huai
Browse files

[SPARK-11453][SQL] append data to partitioned table will messes up the result

The reason is that:

1. For partitioned hive table, we will move the partitioned columns after data columns. (e.g. `<a: Int, b: Int>` partition by `a` will become `<b: Int, a: Int>`)
2. When append data to table, we use position to figure out how to match input columns to table's columns.

So when we append data to partitioned table, we will match wrong columns between input and table. A solution is reordering the input columns before match by position, like what we did for [`InsertIntoHadoopFsRelation`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala#L101-L105)

Author: Wenchen Fan <wenchen@databricks.com>

Closes #9408 from cloud-fan/append.
parent 97b7080c
No related branches found
No related tags found
No related merge requests found
...@@ -23,8 +23,8 @@ import scala.collection.JavaConverters._ ...@@ -23,8 +23,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.Experimental import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier} import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.catalyst.plans.logical.{Project, InsertIntoTable}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource} import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.sources.HadoopFsRelation import org.apache.spark.sql.sources.HadoopFsRelation
...@@ -167,17 +167,38 @@ final class DataFrameWriter private[sql](df: DataFrame) { ...@@ -167,17 +167,38 @@ final class DataFrameWriter private[sql](df: DataFrame) {
} }
private def insertInto(tableIdent: TableIdentifier): Unit = { private def insertInto(tableIdent: TableIdentifier): Unit = {
val partitions = partitioningColumns.map(_.map(col => col -> (None: Option[String])).toMap) val partitions = normalizedParCols.map(_.map(col => col -> (None: Option[String])).toMap)
val overwrite = mode == SaveMode.Overwrite val overwrite = mode == SaveMode.Overwrite
// A partitioned relation's schema can be different from the input logicalPlan, since
// partition columns are all moved after data columns. We Project to adjust the ordering.
// TODO: this belongs to the analyzer.
val input = normalizedParCols.map { parCols =>
val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { attr =>
parCols.contains(attr.name)
}
Project(inputDataCols ++ inputPartCols, df.logicalPlan)
}.getOrElse(df.logicalPlan)
df.sqlContext.executePlan( df.sqlContext.executePlan(
InsertIntoTable( InsertIntoTable(
UnresolvedRelation(tableIdent), UnresolvedRelation(tableIdent),
partitions.getOrElse(Map.empty[String, Option[String]]), partitions.getOrElse(Map.empty[String, Option[String]]),
df.logicalPlan, input,
overwrite, overwrite,
ifNotExists = false)).toRdd ifNotExists = false)).toRdd
} }
private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { parCols =>
parCols.map { col =>
df.logicalPlan.output
.map(_.name)
.find(df.sqlContext.analyzer.resolver(_, col))
.getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
}
}
/** /**
* Saves the content of the [[DataFrame]] as the specified table. * Saves the content of the [[DataFrame]] as the specified table.
* *
......
...@@ -53,4 +53,12 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { ...@@ -53,4 +53,12 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
Utils.deleteRecursively(path) Utils.deleteRecursively(path)
} }
test("partitioned columns should appear at the end of schema") {
withTempPath { f =>
val path = f.getAbsolutePath
Seq(1 -> "a").toDF("i", "j").write.partitionBy("i").parquet(path)
assert(sqlContext.read.parquet(path).schema.map(_.name) == Seq("j", "i"))
}
}
} }
...@@ -1428,4 +1428,24 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ...@@ -1428,4 +1428,24 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year == 2012"), Row("a")) checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year == 2012"), Row("a"))
} }
} }
test("SPARK-11453: append data to partitioned table") {
withTable("tbl11453") {
Seq("1" -> "10", "2" -> "20").toDF("i", "j")
.write.partitionBy("i").saveAsTable("tbl11453")
Seq("3" -> "30").toDF("i", "j")
.write.mode(SaveMode.Append).partitionBy("i").saveAsTable("tbl11453")
checkAnswer(
sqlContext.read.table("tbl11453").select("i", "j").orderBy("i"),
Row("1", "10") :: Row("2", "20") :: Row("3", "30") :: Nil)
// make sure case sensitivity is correct.
Seq("4" -> "40").toDF("i", "j")
.write.mode(SaveMode.Append).partitionBy("I").saveAsTable("tbl11453")
checkAnswer(
sqlContext.read.table("tbl11453").select("i", "j").orderBy("i"),
Row("1", "10") :: Row("2", "20") :: Row("3", "30") :: Row("4", "40") :: Nil)
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment