Skip to content
Snippets Groups Projects
Commit 6273a711 authored by Jen-Ming Chung's avatar Jen-Ming Chung Committed by gatorsmile
Browse files

[SPARK-21610][SQL] Corrupt records are not handled properly when creating a dataframe from a file

## What changes were proposed in this pull request?
```
echo '{"field": 1}
{"field": 2}
{"field": "3"}' >/tmp/sample.json
```

```scala
import org.apache.spark.sql.types._

val schema = new StructType()
  .add("field", ByteType)
  .add("_corrupt_record", StringType)

val file = "/tmp/sample.json"

val dfFromFile = spark.read.schema(schema).json(file)

scala> dfFromFile.show(false)
+-----+---------------+
|field|_corrupt_record|
+-----+---------------+
|1    |null           |
|2    |null           |
|null |{"field": "3"} |
+-----+---------------+

scala> dfFromFile.filter($"_corrupt_record".isNotNull).count()
res1: Long = 0

scala> dfFromFile.filter($"_corrupt_record".isNull).count()
res2: Long = 3
```
When the `requiredSchema` only contains `_corrupt_record`, the derived `actualSchema` is empty and the `_corrupt_record` are all null for all rows. This PR captures above situation and raise an exception with a reasonable workaround messag so that users can know what happened and how to fix the query.

## How was this patch tested?

Added test case.

Author: Jen-Ming Chung <jenmingisme@gmail.com>

Closes #18865 from jmchung/SPARK-21610.
parent 520d92a1
No related branches found
No related tags found
No related merge requests found
...@@ -1543,6 +1543,10 @@ options. ...@@ -1543,6 +1543,10 @@ options.
# Migration Guide # Migration Guide
## Upgrading From Spark SQL 2.2 to 2.3
- Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`.
## Upgrading From Spark SQL 2.1 to 2.2 ## Upgrading From Spark SQL 2.1 to 2.2
- Spark 2.1.1 introduced a new configuration key: `spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of `NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0 changes this setting's default value to `INFER_AND_SAVE` to restore compatibility with reading Hive metastore tables whose underlying file schema have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on first access Spark will perform schema inference on any Hive metastore table for which it has not already saved an inferred schema. Note that schema inference can be a very time consuming operation for tables with thousands of partitions. If compatibility with mixed-case column names is not a concern, you can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to avoid the initial overhead of schema inference. Note that with the new default `INFER_AND_SAVE` setting, the results of the schema inference are saved as a metastore key for future use. Therefore, the initial schema inference occurs only at a table's first access. - Spark 2.1.1 introduced a new configuration key: `spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of `NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0 changes this setting's default value to `INFER_AND_SAVE` to restore compatibility with reading Hive metastore tables whose underlying file schema have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on first access Spark will perform schema inference on any Hive metastore table for which it has not already saved an inferred schema. Note that schema inference can be a very time consuming operation for tables with thousands of partitions. If compatibility with mixed-case column names is not a concern, you can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to avoid the initial overhead of schema inference. Note that with the new default `INFER_AND_SAVE` setting, the results of the schema inference are saved as a metastore key for future use. Therefore, the initial schema inference occurs only at a table's first access.
......
...@@ -113,6 +113,20 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { ...@@ -113,6 +113,20 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
} }
} }
if (requiredSchema.length == 1 &&
requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
throw new AnalysisException(
"Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" +
"referenced columns only include the internal corrupt record column\n" +
s"(named ${parsedOptions.columnNameOfCorruptRecord} by default). For example:\n" +
"spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n" +
"and spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" +
"Instead, you can cache or save the parsed results and then send the same query.\n" +
"For example, val df = spark.read.schema(schema).json(file).cache() and then\n" +
"df.filter($\"_corrupt_record\".isNotNull).count()."
)
}
(file: PartitionedFile) => { (file: PartitionedFile) => {
val parser = new JacksonParser(actualSchema, parsedOptions) val parser = new JacksonParser(actualSchema, parsedOptions)
JsonDataSource(parsedOptions).readFile( JsonDataSource(parsedOptions).readFile(
......
...@@ -2034,4 +2034,33 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ...@@ -2034,4 +2034,33 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
} }
} }
} }
test("SPARK-21610: Corrupt records are not handled properly when creating a dataframe " +
"from a file") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val data =
"""{"field": 1}
|{"field": 2}
|{"field": "3"}""".stripMargin
Seq(data).toDF().repartition(1).write.text(path)
val schema = new StructType().add("field", ByteType).add("_corrupt_record", StringType)
// negative cases
val msg = intercept[AnalysisException] {
spark.read.schema(schema).json(path).select("_corrupt_record").collect()
}.getMessage
assert(msg.contains("only include the internal corrupt record column"))
intercept[catalyst.errors.TreeNodeException[_]] {
spark.read.schema(schema).json(path).filter($"_corrupt_record".isNotNull).count()
}
// workaround
val df = spark.read.schema(schema).json(path).cache()
assert(df.filter($"_corrupt_record".isNotNull).count() == 1)
assert(df.filter($"_corrupt_record".isNull).count() == 2)
checkAnswer(
df.select("_corrupt_record"),
Row(null) :: Row(null) :: Row("{\"field\": \"3\"}") :: Nil
)
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment