From 6273a711b69139ef0210f59759030a0b4a26b118 Mon Sep 17 00:00:00 2001
From: Jen-Ming Chung <jenmingisme@gmail.com>
Date: Sun, 10 Sep 2017 17:26:43 -0700
Subject: [PATCH] [SPARK-21610][SQL] Corrupt records are not handled properly
 when creating a dataframe from a file

## What changes were proposed in this pull request?
```
echo '{"field": 1}
{"field": 2}
{"field": "3"}' >/tmp/sample.json
```

```scala
import org.apache.spark.sql.types._

val schema = new StructType()
  .add("field", ByteType)
  .add("_corrupt_record", StringType)

val file = "/tmp/sample.json"

val dfFromFile = spark.read.schema(schema).json(file)

scala> dfFromFile.show(false)
+-----+---------------+
|field|_corrupt_record|
+-----+---------------+
|1    |null           |
|2    |null           |
|null |{"field": "3"} |
+-----+---------------+

scala> dfFromFile.filter($"_corrupt_record".isNotNull).count()
res1: Long = 0

scala> dfFromFile.filter($"_corrupt_record".isNull).count()
res2: Long = 3
```
When the `requiredSchema` only contains `_corrupt_record`, the derived `actualSchema` is empty and the `_corrupt_record` are all null for all rows. This PR captures above situation and raise an exception with a reasonable workaround messag so that users can know what happened and how to fix the query.

## How was this patch tested?

Added test case.

Author: Jen-Ming Chung <jenmingisme@gmail.com>

Closes #18865 from jmchung/SPARK-21610.
---
 docs/sql-programming-guide.md                 |  4 +++
 .../datasources/json/JsonFileFormat.scala     | 14 +++++++++
 .../datasources/json/JsonSuite.scala          | 29 +++++++++++++++++++
 3 files changed, 47 insertions(+)

diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 45ba4d168f..0a8acbb525 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1543,6 +1543,10 @@ options.
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.2 to 2.3
+
+  - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`.
+
 ## Upgrading From Spark SQL 2.1 to 2.2
 
   - Spark 2.1.1 introduced a new configuration key: `spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of `NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0 changes this setting's default value to `INFER_AND_SAVE` to restore compatibility with reading Hive metastore tables whose underlying file schema have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on first access Spark will perform schema inference on any Hive metastore table for which it has not already saved an inferred schema. Note that schema inference can be a very time consuming operation for tables with thousands of partitions. If compatibility with mixed-case column names is not a concern, you can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to avoid the initial overhead of schema inference. Note that with the new default `INFER_AND_SAVE` setting, the results of the schema inference are saved as a metastore key for future use. Therefore, the initial schema inference occurs only at a table's first access.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 53d62d88b0..b5ed6e4636 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -113,6 +113,20 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
       }
     }
 
+    if (requiredSchema.length == 1 &&
+      requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
+      throw new AnalysisException(
+        "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" +
+        "referenced columns only include the internal corrupt record column\n" +
+        s"(named ${parsedOptions.columnNameOfCorruptRecord} by default). For example:\n" +
+        "spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n" +
+        "and spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" +
+        "Instead, you can cache or save the parsed results and then send the same query.\n" +
+        "For example, val df = spark.read.schema(schema).json(file).cache() and then\n" +
+        "df.filter($\"_corrupt_record\".isNotNull).count()."
+      )
+    }
+
     (file: PartitionedFile) => {
       val parser = new JacksonParser(actualSchema, parsedOptions)
       JsonDataSource(parsedOptions).readFile(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 0008954e36..8c8d41ebf1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2034,4 +2034,33 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       }
     }
   }
+
+  test("SPARK-21610: Corrupt records are not handled properly when creating a dataframe " +
+    "from a file") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val data =
+        """{"field": 1}
+          |{"field": 2}
+          |{"field": "3"}""".stripMargin
+      Seq(data).toDF().repartition(1).write.text(path)
+      val schema = new StructType().add("field", ByteType).add("_corrupt_record", StringType)
+      // negative cases
+      val msg = intercept[AnalysisException] {
+        spark.read.schema(schema).json(path).select("_corrupt_record").collect()
+      }.getMessage
+      assert(msg.contains("only include the internal corrupt record column"))
+      intercept[catalyst.errors.TreeNodeException[_]] {
+        spark.read.schema(schema).json(path).filter($"_corrupt_record".isNotNull).count()
+      }
+      // workaround
+      val df = spark.read.schema(schema).json(path).cache()
+      assert(df.filter($"_corrupt_record".isNotNull).count() == 1)
+      assert(df.filter($"_corrupt_record".isNull).count() == 2)
+      checkAnswer(
+        df.select("_corrupt_record"),
+        Row(null) :: Row(null) :: Row("{\"field\": \"3\"}") :: Nil
+      )
+    }
+  }
 }
-- 
GitLab