Skip to content
Snippets Groups Projects
Commit 5827b65e authored by gatorsmile's avatar gatorsmile Committed by Yin Huai
Browse files

[SPARK-15808][SQL] File Format Checking When Appending Data

#### What changes were proposed in this pull request?
**Issue:** Got wrong results or strange errors when append data to a table with mismatched file format.

_Example 1: PARQUET -> CSV_
```Scala
createDF(0, 9).write.format("parquet").saveAsTable("appendParquetToOrc")
createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendParquetToOrc")
```

Error we got:
```
Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.RuntimeException: file:/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/warehouse-bc8fedf2-aa6a-4002-a18b-524c6ac859d4/appendorctoparquet/part-r-00000-c0e3f365-1d46-4df5-a82c-b47d7af9feb9.snappy.orc is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [79, 82, 67, 23]
```

_Example 2: Json -> CSV_
```Scala
createDF(0, 9).write.format("json").saveAsTable("appendJsonToCSV")
createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("appendJsonToCSV")
```

No exception, but wrong results:
```
+----+----+
|  c1|  c2|
+----+----+
|null|null|
|null|null|
|null|null|
|null|null|
|   0|str0|
|   1|str1|
|   2|str2|
|   3|str3|
|   4|str4|
|   5|str5|
|   6|str6|
|   7|str7|
|   8|str8|
|   9|str9|
+----+----+
```
_Example 3: Json -> Text_
```Scala
createDF(0, 9).write.format("json").saveAsTable("appendJsonToText")
createDF(10, 19).write.mode(SaveMode.Append).format("text").saveAsTable("appendJsonToText")
```

Error we got:
```
Text data source supports only a single column, and you have 2 columns.
```

This PR is to issue an exception with appropriate error messages.

#### How was this patch tested?
Added test cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #13546 from gatorsmile/fileFormatCheck.
parent 7b9071ee
No related branches found
No related tags found
No related merge requests found
......@@ -197,6 +197,15 @@ case class CreateDataSourceTableAsSelectCommand(
EliminateSubqueryAliases(
sessionState.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
// check if the file formats match
l.relation match {
case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass =>
throw new AnalysisException(
s"The file format of the existing table $tableIdent is " +
s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " +
s"format `$provider`")
case _ =>
}
if (query.schema.size != l.schema.size) {
throw new AnalysisException(
s"The column number of the existing schema[${l.schema}] " +
......
......@@ -891,6 +891,78 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}
test("append table using different formats") {
def createDF(from: Int, to: Int): DataFrame = {
(from to to).map(i => i -> s"str$i").toDF("c1", "c2")
}
withTable("appendOrcToParquet") {
createDF(0, 9).write.format("parquet").saveAsTable("appendOrcToParquet")
val e = intercept[AnalysisException] {
createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet")
}
assert(e.getMessage.contains("The file format of the existing table `appendOrcToParquet` " +
"is `org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat`. " +
"It doesn't match the specified format `orc`"))
}
withTable("appendParquetToJson") {
createDF(0, 9).write.format("json").saveAsTable("appendParquetToJson")
val e = intercept[AnalysisException] {
createDF(10, 19).write.mode(SaveMode.Append).format("parquet")
.saveAsTable("appendParquetToJson")
}
assert(e.getMessage.contains("The file format of the existing table `appendParquetToJson` " +
"is `org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " +
"It doesn't match the specified format `parquet`"))
}
withTable("appendTextToJson") {
createDF(0, 9).write.format("json").saveAsTable("appendTextToJson")
val e = intercept[AnalysisException] {
createDF(10, 19).write.mode(SaveMode.Append).format("text")
.saveAsTable("appendTextToJson")
}
assert(e.getMessage.contains("The file format of the existing table `appendTextToJson` is " +
"`org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " +
"It doesn't match the specified format `text`"))
}
}
test("append a table using the same formats but different names") {
def createDF(from: Int, to: Int): DataFrame = {
(from to to).map(i => i -> s"str$i").toDF("c1", "c2")
}
withTable("appendParquet") {
createDF(0, 9).write.format("parquet").saveAsTable("appendParquet")
createDF(10, 19).write.mode(SaveMode.Append).format("org.apache.spark.sql.parquet")
.saveAsTable("appendParquet")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM appendParquet p WHERE p.c1 > 5"),
(6 to 19).map(i => Row(i, s"str$i")))
}
withTable("appendParquet") {
createDF(0, 9).write.format("org.apache.spark.sql.parquet").saveAsTable("appendParquet")
createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("appendParquet")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM appendParquet p WHERE p.c1 > 5"),
(6 to 19).map(i => Row(i, s"str$i")))
}
withTable("appendParquet") {
createDF(0, 9).write.format("org.apache.spark.sql.parquet.DefaultSource")
.saveAsTable("appendParquet")
createDF(10, 19).write.mode(SaveMode.Append)
.format("org.apache.spark.sql.execution.datasources.parquet.DefaultSource")
.saveAsTable("appendParquet")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM appendParquet p WHERE p.c1 > 5"),
(6 to 19).map(i => Row(i, s"str$i")))
}
}
test("SPARK-8156:create table to specific database by 'use dbname' ") {
val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment