diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 45ba4d168f04375b2907fab38d78629b4acd7d56..0a8acbb52575bda05624748712c2e2757008e7c9 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1543,6 +1543,10 @@ options. # Migration Guide +## Upgrading From Spark SQL 2.2 to 2.3 + + - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`. + ## Upgrading From Spark SQL 2.1 to 2.2 - Spark 2.1.1 introduced a new configuration key: `spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of `NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0 changes this setting's default value to `INFER_AND_SAVE` to restore compatibility with reading Hive metastore tables whose underlying file schema have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on first access Spark will perform schema inference on any Hive metastore table for which it has not already saved an inferred schema. Note that schema inference can be a very time consuming operation for tables with thousands of partitions. If compatibility with mixed-case column names is not a concern, you can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to avoid the initial overhead of schema inference. Note that with the new default `INFER_AND_SAVE` setting, the results of the schema inference are saved as a metastore key for future use. Therefore, the initial schema inference occurs only at a table's first access. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 53d62d88b04c68cf177856527f84de44a9d59ae2..b5ed6e46366538508fc29306f9c048c4dde8c938 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -113,6 +113,20 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { } } + if (requiredSchema.length == 1 && + requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) { + throw new AnalysisException( + "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" + + "referenced columns only include the internal corrupt record column\n" + + s"(named ${parsedOptions.columnNameOfCorruptRecord} by default). For example:\n" + + "spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n" + + "and spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" + + "Instead, you can cache or save the parsed results and then send the same query.\n" + + "For example, val df = spark.read.schema(schema).json(file).cache() and then\n" + + "df.filter($\"_corrupt_record\".isNotNull).count()." + ) + } + (file: PartitionedFile) => { val parser = new JacksonParser(actualSchema, parsedOptions) JsonDataSource(parsedOptions).readFile( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0008954e36bdd197a4d795c6051edc417b1519f5..8c8d41ebf115aa2fb1b10adbd1697e6735d8ae1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2034,4 +2034,33 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } } + + test("SPARK-21610: Corrupt records are not handled properly when creating a dataframe " + + "from a file") { + withTempPath { dir => + val path = dir.getCanonicalPath + val data = + """{"field": 1} + |{"field": 2} + |{"field": "3"}""".stripMargin + Seq(data).toDF().repartition(1).write.text(path) + val schema = new StructType().add("field", ByteType).add("_corrupt_record", StringType) + // negative cases + val msg = intercept[AnalysisException] { + spark.read.schema(schema).json(path).select("_corrupt_record").collect() + }.getMessage + assert(msg.contains("only include the internal corrupt record column")) + intercept[catalyst.errors.TreeNodeException[_]] { + spark.read.schema(schema).json(path).filter($"_corrupt_record".isNotNull).count() + } + // workaround + val df = spark.read.schema(schema).json(path).cache() + assert(df.filter($"_corrupt_record".isNotNull).count() == 1) + assert(df.filter($"_corrupt_record".isNull).count() == 2) + checkAnswer( + df.select("_corrupt_record"), + Row(null) :: Row(null) :: Row("{\"field\": \"3\"}") :: Nil + ) + } + } }