diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index df4d406b84d603a17de793ca9f08fb1e4afb99e2..9fb0ea68153d272c7fc0ac04467bc1ac806083cc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import java.io.{ByteArrayOutputStream, CharArrayWriter, StringWriter}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, CharArrayWriter, InputStreamReader, StringWriter}
 
 import scala.util.parsing.combinator.RegexParsers
 
@@ -149,7 +149,10 @@ case class GetJsonObject(json: Expression, path: Expression)
 
     if (parsed.isDefined) {
       try {
-        Utils.tryWithResource(jsonFactory.createParser(jsonStr.getBytes)) { parser =>
+        /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson
+          detect character encoding which could fail for some malformed strings */
+        Utils.tryWithResource(jsonFactory.createParser(new InputStreamReader(
+            new ByteArrayInputStream(jsonStr.getBytes), "UTF-8"))) { parser =>
           val output = new ByteArrayOutputStream()
           val matched = Utils.tryWithResource(
             jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator =>
@@ -393,7 +396,10 @@ case class JsonTuple(children: Seq[Expression])
     }
 
     try {
-      Utils.tryWithResource(jsonFactory.createParser(json.getBytes)) {
+      /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson
+      detect character encoding which could fail for some malformed strings */
+      Utils.tryWithResource(jsonFactory.createParser(new InputStreamReader(
+          new ByteArrayInputStream(json.getBytes), "UTF-8"))) {
         parser => parseRow(parser, input)
       }
     } catch {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index c5b72235e5db0653d8151eb071e45aec3384eddd..4402ad4e9a9e573499ad593441e561e73163f532 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -39,6 +39,10 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       |"fb:testid":"1234"}
       |""".stripMargin
 
+  /* invalid json with leading nulls would trigger java.io.CharConversionException
+   in Jackson's JsonFactory.createParser(byte[]) due to RFC-4627 encoding detection */
+  val badJson = "\0\0\0A\1AAA"
+
   test("$.store.bicycle") {
     checkEvaluation(
       GetJsonObject(Literal(json), Literal("$.store.bicycle")),
@@ -224,6 +228,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       null)
   }
 
+  test("SPARK-16548: character conversion") {
+    checkEvaluation(
+      GetJsonObject(Literal(badJson), Literal("$.a")),
+      null
+    )
+  }
+
   test("non foldable literal") {
     checkEvaluation(
       GetJsonObject(NonFoldableLiteral(json), NonFoldableLiteral("$.fb:testid")),
@@ -340,6 +351,12 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       InternalRow(null, null, null, null, null))
   }
 
+  test("SPARK-16548: json_tuple - invalid json with leading nulls") {
+    checkJsonTuple(
+      JsonTuple(Literal(badJson) :: jsonTupleQuery),
+      InternalRow(null, null, null, null, null))
+  }
+
   test("json_tuple - preserve newlines") {
     checkJsonTuple(
       JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),