Skip to content
Snippets Groups Projects
Commit 1a527bde authored by Xiao Li's avatar Xiao Li Committed by Wenchen Fan
Browse files

[SPARK-20976][SQL] Unify Error Messages for FAILFAST mode

### What changes were proposed in this pull request?
Before 2.2, we indicate the job was terminated because of `FAILFAST` mode.
```
Malformed line in FAILFAST mode: {"a":{, b:3}
```
If possible, we should keep it. This PR is to unify the error messages.

### How was this patch tested?
Modified the existing messages.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #18196 from gatorsmile/messFailFast.
parent 55b8cfe6
No related branches found
No related tags found
No related merge requests found
...@@ -278,7 +278,7 @@ class JacksonParser( ...@@ -278,7 +278,7 @@ class JacksonParser(
// We cannot parse this token based on the given data type. So, we throw a // We cannot parse this token based on the given data type. So, we throw a
// RuntimeException and this exception will be caught by `parse` method. // RuntimeException and this exception will be caught by `parse` method.
throw new RuntimeException( throw new RuntimeException(
s"Failed to parse a value for data type $dataType (current token: $token).") s"Failed to parse a value for data type ${dataType.catalogString} (current token: $token).")
} }
/** /**
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources package org.apache.spark.sql.execution.datasources
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util._
...@@ -65,7 +66,8 @@ class FailureSafeParser[IN]( ...@@ -65,7 +66,8 @@ class FailureSafeParser[IN](
case DropMalformedMode => case DropMalformedMode =>
Iterator.empty Iterator.empty
case FailFastMode => case FailFastMode =>
throw e.cause throw new SparkException("Malformed records are detected in record parsing. " +
s"Parse Mode: ${FailFastMode.name}.", e.cause)
} }
} }
} }
......
...@@ -21,6 +21,7 @@ import java.util.Comparator ...@@ -21,6 +21,7 @@ import java.util.Comparator
import com.fasterxml.jackson.core._ import com.fasterxml.jackson.core._
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
...@@ -61,7 +62,8 @@ private[sql] object JsonInferSchema { ...@@ -61,7 +62,8 @@ private[sql] object JsonInferSchema {
case DropMalformedMode => case DropMalformedMode =>
None None
case FailFastMode => case FailFastMode =>
throw e throw new SparkException("Malformed records are detected in schema inference. " +
s"Parse Mode: ${FailFastMode.name}.", e)
} }
} }
} }
...@@ -231,8 +233,9 @@ private[sql] object JsonInferSchema { ...@@ -231,8 +233,9 @@ private[sql] object JsonInferSchema {
case FailFastMode => case FailFastMode =>
// If `other` is not struct type, consider it as malformed one and throws an exception. // If `other` is not struct type, consider it as malformed one and throws an exception.
throw new RuntimeException("Failed to infer a common schema. Struct types are expected" + throw new SparkException("Malformed records are detected in schema inference. " +
s" but ${other.catalogString} was found.") s"Parse Mode: ${FailFastMode.name}. Reasons: Failed to infer a common schema. " +
s"Struct types are expected, but `${other.catalogString}` was found.")
} }
/** /**
......
...@@ -1036,24 +1036,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ...@@ -1036,24 +1036,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
} }
test("Corrupt records: FAILFAST mode") { test("Corrupt records: FAILFAST mode") {
val schema = StructType(
StructField("a", StringType, true) :: Nil)
// `FAILFAST` mode should throw an exception for corrupt records. // `FAILFAST` mode should throw an exception for corrupt records.
val exceptionOne = intercept[SparkException] { val exceptionOne = intercept[SparkException] {
spark.read spark.read
.option("mode", "FAILFAST") .option("mode", "FAILFAST")
.json(corruptRecords) .json(corruptRecords)
} }.getMessage
assert(exceptionOne.getMessage.contains("JsonParseException")) assert(exceptionOne.contains(
"Malformed records are detected in schema inference. Parse Mode: FAILFAST."))
val exceptionTwo = intercept[SparkException] { val exceptionTwo = intercept[SparkException] {
spark.read spark.read
.option("mode", "FAILFAST") .option("mode", "FAILFAST")
.schema(schema) .schema("a string")
.json(corruptRecords) .json(corruptRecords)
.collect() .collect()
} }.getMessage
assert(exceptionTwo.getMessage.contains("JsonParseException")) assert(exceptionTwo.contains(
"Malformed records are detected in record parsing. Parse Mode: FAILFAST."))
} }
test("Corrupt records: DROPMALFORMED mode") { test("Corrupt records: DROPMALFORMED mode") {
...@@ -1944,7 +1944,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ...@@ -1944,7 +1944,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.option("mode", "FAILFAST") .option("mode", "FAILFAST")
.json(path) .json(path)
} }
assert(exceptionOne.getMessage.contains("Failed to infer a common schema")) assert(exceptionOne.getMessage.contains("Malformed records are detected in schema " +
"inference. Parse Mode: FAILFAST."))
val exceptionTwo = intercept[SparkException] { val exceptionTwo = intercept[SparkException] {
spark.read spark.read
...@@ -1954,7 +1955,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ...@@ -1954,7 +1955,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.json(path) .json(path)
.collect() .collect()
} }
assert(exceptionTwo.getMessage.contains("Failed to parse a value")) assert(exceptionTwo.getMessage.contains("Malformed records are detected in record " +
"parsing. Parse Mode: FAILFAST."))
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment