Skip to content
Snippets Groups Projects
Commit 9d66c421 authored by Yin Huai's avatar Yin Huai Committed by Reynold Xin
Browse files

[SPARK-12057][SQL] Prevent failure on corrupt JSON records

This PR makes JSON parser and schema inference handle more cases where we have unparsed records. It is based on #10043. The last commit fixes the failed test and updates the logic of schema inference.

Regarding the schema inference change, if we have something like
```
{"f1":1}
[1,2,3]
```
originally, we will get a DF without any column.
After this change, we will get a DF with columns `f1` and `_corrupt_record`. Basically, for the second row, `[1,2,3]` will be the value of `_corrupt_record`.

When merge this PR, please make sure that the author is simplyianm.

JIRA: https://issues.apache.org/jira/browse/SPARK-12057

Closes #10043

Author: Ian Macalinao <me@ian.pw>
Author: Yin Huai <yhuai@databricks.com>

Closes #10288 from yhuai/handleCorruptJson.
parent 437583f6
No related branches found
No related tags found
No related merge requests found
...@@ -61,7 +61,10 @@ private[json] object InferSchema { ...@@ -61,7 +61,10 @@ private[json] object InferSchema {
StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))) StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
} }
} }
}.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType) }.treeAggregate[DataType](
StructType(Seq()))(
compatibleRootType(columnNameOfCorruptRecords),
compatibleRootType(columnNameOfCorruptRecords))
canonicalizeType(rootType) match { canonicalizeType(rootType) match {
case Some(st: StructType) => st case Some(st: StructType) => st
...@@ -170,12 +173,38 @@ private[json] object InferSchema { ...@@ -170,12 +173,38 @@ private[json] object InferSchema {
case other => Some(other) case other => Some(other)
} }
private def withCorruptField(
struct: StructType,
columnNameOfCorruptRecords: String): StructType = {
if (!struct.fieldNames.contains(columnNameOfCorruptRecords)) {
// If this given struct does not have a column used for corrupt records,
// add this field.
struct.add(columnNameOfCorruptRecords, StringType, nullable = true)
} else {
// Otherwise, just return this struct.
struct
}
}
/** /**
* Remove top-level ArrayType wrappers and merge the remaining schemas * Remove top-level ArrayType wrappers and merge the remaining schemas
*/ */
private def compatibleRootType: (DataType, DataType) => DataType = { private def compatibleRootType(
case (ArrayType(ty1, _), ty2) => compatibleRootType(ty1, ty2) columnNameOfCorruptRecords: String): (DataType, DataType) => DataType = {
case (ty1, ArrayType(ty2, _)) => compatibleRootType(ty1, ty2) // Since we support array of json objects at the top level,
// we need to check the element type and find the root level data type.
case (ArrayType(ty1, _), ty2) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
case (ty1, ArrayType(ty2, _)) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
// If we see any other data type at the root level, we get records that cannot be
// parsed. So, we use the struct as the data type and add the corrupt field to the schema.
case (struct: StructType, NullType) => struct
case (NullType, struct: StructType) => struct
case (struct: StructType, o) if !o.isInstanceOf[StructType] =>
withCorruptField(struct, columnNameOfCorruptRecords)
case (o, struct: StructType) if !o.isInstanceOf[StructType] =>
withCorruptField(struct, columnNameOfCorruptRecords)
// If we get anything else, we call compatibleType.
// Usually, when we reach here, ty1 and ty2 are two StructTypes.
case (ty1, ty2) => compatibleType(ty1, ty2) case (ty1, ty2) => compatibleType(ty1, ty2)
} }
......
...@@ -31,6 +31,8 @@ import org.apache.spark.sql.types._ ...@@ -31,6 +31,8 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
object JacksonParser { object JacksonParser {
def parse( def parse(
...@@ -110,7 +112,7 @@ object JacksonParser { ...@@ -110,7 +112,7 @@ object JacksonParser {
lowerCaseValue.equals("-inf")) { lowerCaseValue.equals("-inf")) {
value.toFloat value.toFloat
} else { } else {
sys.error(s"Cannot parse $value as FloatType.") throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
} }
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) => case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) =>
...@@ -127,7 +129,7 @@ object JacksonParser { ...@@ -127,7 +129,7 @@ object JacksonParser {
lowerCaseValue.equals("-inf")) { lowerCaseValue.equals("-inf")) {
value.toDouble value.toDouble
} else { } else {
sys.error(s"Cannot parse $value as DoubleType.") throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
} }
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) => case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) =>
...@@ -174,7 +176,11 @@ object JacksonParser { ...@@ -174,7 +176,11 @@ object JacksonParser {
convertField(factory, parser, udt.sqlType) convertField(factory, parser, udt.sqlType)
case (token, dataType) => case (token, dataType) =>
sys.error(s"Failed to parse a value for data type $dataType (current token: $token).") // We cannot parse this token based on the given data type. So, we throw a
// SparkSQLJsonProcessingException and this exception will be caught by
// parseJson method.
throw new SparkSQLJsonProcessingException(
s"Failed to parse a value for data type $dataType (current token: $token).")
} }
} }
...@@ -267,15 +273,14 @@ object JacksonParser { ...@@ -267,15 +273,14 @@ object JacksonParser {
array.toArray[InternalRow](schema) array.toArray[InternalRow](schema)
} }
case _ => case _ =>
sys.error( failedRecord(record)
s"Failed to parse record $record. Please make sure that each line of " +
"the file (or each string in the RDD) is a valid JSON object or " +
"an array of JSON objects.")
} }
} }
} catch { } catch {
case _: JsonProcessingException => case _: JsonProcessingException =>
failedRecord(record) failedRecord(record)
case _: SparkSQLJsonProcessingException =>
failedRecord(record)
} }
} }
} }
......
...@@ -1427,4 +1427,41 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ...@@ -1427,4 +1427,41 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
} }
} }
} }
test("SPARK-12057 additional corrupt records do not throw exceptions") {
// Test if we can query corrupt records.
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
withTempTable("jsonTable") {
val schema = StructType(
StructField("_unparsed", StringType, true) ::
StructField("dummy", StringType, true) :: Nil)
{
// We need to make sure we can infer the schema.
val jsonDF = sqlContext.read.json(additionalCorruptRecords)
assert(jsonDF.schema === schema)
}
{
val jsonDF = sqlContext.read.schema(schema).json(additionalCorruptRecords)
jsonDF.registerTempTable("jsonTable")
// In HiveContext, backticks should be used to access columns starting with a underscore.
checkAnswer(
sql(
"""
|SELECT dummy, _unparsed
|FROM jsonTable
""".stripMargin),
Row("test", null) ::
Row(null, """[1,2,3]""") ::
Row(null, """":"test", "a":1}""") ::
Row(null, """42""") ::
Row(null, """ ","ian":"test"}""") :: Nil
)
}
}
}
}
} }
...@@ -188,6 +188,14 @@ private[json] trait TestJsonData { ...@@ -188,6 +188,14 @@ private[json] trait TestJsonData {
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" :: """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
"""]""" :: Nil) """]""" :: Nil)
def additionalCorruptRecords: RDD[String] =
sqlContext.sparkContext.parallelize(
"""{"dummy":"test"}""" ::
"""[1,2,3]""" ::
"""":"test", "a":1}""" ::
"""42""" ::
""" ","ian":"test"}""" :: Nil)
def emptyRecords: RDD[String] = def emptyRecords: RDD[String] =
sqlContext.sparkContext.parallelize( sqlContext.sparkContext.parallelize(
"""{""" :: """{""" ::
...@@ -197,7 +205,6 @@ private[json] trait TestJsonData { ...@@ -197,7 +205,6 @@ private[json] trait TestJsonData {
"""{"b": [{"c": {}}]}""" :: """{"b": [{"c": {}}]}""" ::
"""]""" :: Nil) """]""" :: Nil)
lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil) lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]()) def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment