diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 4918780873bdfc08e039467e3d511e6f587954c8..c38eca5156e5ac64c29954d169d02d0d6e70a8ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -242,8 +242,13 @@ case class CreateDataSourceTableAsSelectCommand( bucketSpec = bucketSpec, options = optionsWithPath) - val result = dataSource.write(mode, df) - + val result = try { + dataSource.write(mode, df) + } catch { + case ex: AnalysisException => + logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex) + throw ex + } if (createMetastoreTable) { // We will use the schema of resolved.relation as the schema of the table (instead of // the schema of df). It is important since the nullability may be changed by the relation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 7f3683fc98197e616b6371ae81df1b8d38752dcd..f274fc77daef08d841691745e64f3f665909033c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -435,26 +435,25 @@ case class DataSource( // If we are appending to a table that already exists, make sure the partitioning matches // up. If we fail to load the table for whatever reason, ignore the check. if (mode == SaveMode.Append) { - val existingPartitionColumnSet = try { - Some( - resolveRelation() - .asInstanceOf[HadoopFsRelation] - .location - .partitionSpec() - .partitionColumns - .fieldNames - .toSet) - } catch { - case e: Exception => - None - } - - existingPartitionColumnSet.foreach { ex => - if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) { - throw new AnalysisException( - s"Requested partitioning does not equal existing partitioning: " + - s"$ex != ${partitionColumns.toSet}.") - } + val existingColumns = Try { + resolveRelation() + .asInstanceOf[HadoopFsRelation] + .location + .partitionSpec() + .partitionColumns + .fieldNames + .toSeq + }.getOrElse(Seq.empty[String]) + val sameColumns = + existingColumns.map(_.toLowerCase) == partitionColumns.map(_.toLowerCase) + if (existingColumns.size > 0 && !sameColumns) { + throw new AnalysisException( + s"""Requested partitioning does not match existing partitioning. + |Existing partitioning columns: + | ${existingColumns.mkString(", ")} + |Requested partitioning columns: + | ${partitionColumns.mkString(", ")} + |""".stripMargin) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 7eb2fff91d6e183d7b5adef1d73084e592da05c6..8827649d0a0d9fbf6ff9f3a87f4d874d47039c94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1317,4 +1317,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)") } + test("SPARK-16034 Partition columns should match when appending to existing data source tables") { + import testImplicits._ + val df = Seq((1, 2, 3)).toDF("a", "b", "c") + withTable("partitionedTable") { + df.write.mode("overwrite").partitionBy("a", "b").saveAsTable("partitionedTable") + // Misses some partition columns + intercept[AnalysisException] { + df.write.mode("append").partitionBy("a").saveAsTable("partitionedTable") + } + // Wrong order + intercept[AnalysisException] { + df.write.mode("append").partitionBy("b", "a").saveAsTable("partitionedTable") + } + // Partition columns not specified + intercept[AnalysisException] { + df.write.mode("append").saveAsTable("partitionedTable") + } + assert(sql("select * from partitionedTable").collect().size == 1) + // Inserts new data successfully when partition columns are correctly specified in + // partitionBy(...). + df.write.mode("append").partitionBy("a", "b").saveAsTable("partitionedTable") + assert(sql("select * from partitionedTable").collect().size == 2) + } + } }