From 86174ea89b39a300caaba6baffac70f3dc702788 Mon Sep 17 00:00:00 2001
From: Burak Yavuz <brkyvz@gmail.com>
Date: Tue, 2 May 2017 14:08:16 +0800
Subject: [PATCH] [SPARK-20549] java.io.CharConversionException: Invalid
 UTF-32' in JsonToStructs

## What changes were proposed in this pull request?

A fix for the same problem was made in #17693 but ignored `JsonToStructs`. This PR uses the same fix for `JsonToStructs`.

## How was this patch tested?

Regression test

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #17826 from brkyvz/SPARK-20549.
---
 .../spark/sql/catalyst/expressions/jsonExpressions.scala  | 8 +++-----
 .../spark/sql/catalyst/json/CreateJacksonParser.scala     | 7 +++++--
 .../sql/catalyst/expressions/JsonExpressionsSuite.scala   | 7 +++++++
 3 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 9fb0ea6815..6b90354367 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -151,8 +151,7 @@ case class GetJsonObject(json: Expression, path: Expression)
       try {
         /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson
           detect character encoding which could fail for some malformed strings */
-        Utils.tryWithResource(jsonFactory.createParser(new InputStreamReader(
-            new ByteArrayInputStream(jsonStr.getBytes), "UTF-8"))) { parser =>
+        Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, jsonStr)) { parser =>
           val output = new ByteArrayOutputStream()
           val matched = Utils.tryWithResource(
             jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator =>
@@ -398,9 +397,8 @@ case class JsonTuple(children: Seq[Expression])
     try {
       /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson
       detect character encoding which could fail for some malformed strings */
-      Utils.tryWithResource(jsonFactory.createParser(new InputStreamReader(
-          new ByteArrayInputStream(json.getBytes), "UTF-8"))) {
-        parser => parseRow(parser, input)
+      Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser =>
+        parseRow(parser, input)
       }
     } catch {
       case _: JsonProcessingException =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
index e0ed03a689..025a388aac 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.json
 
-import java.io.InputStream
+import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
 import org.apache.hadoop.io.Text
@@ -33,7 +33,10 @@ private[sql] object CreateJacksonParser extends Serializable {
     val bb = record.getByteBuffer
     assert(bb.hasArray)
 
-    jsonFactory.createParser(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
+    val bain = new ByteArrayInputStream(
+      bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
+
+    jsonFactory.createParser(new InputStreamReader(bain, "UTF-8"))
   }
 
   def text(jsonFactory: JsonFactory, record: Text): JsonParser = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 4402ad4e9a..65d5c3a582 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -453,6 +453,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     )
   }
 
+  test("SPARK-20549: from_json bad UTF-8") {
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    checkEvaluation(
+      JsonToStructs(schema, Map.empty, Literal(badJson), gmtId),
+      null)
+  }
+
   test("from_json with timestamp") {
     val schema = StructType(StructField("t", TimestampType) :: Nil)
 
-- 
GitLab