diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 7279173df6e4f2990d79ef6b249dc0893f8d66d6..01da0dc27d83de66d6cbeda4af09c01b22f75d36 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -176,7 +176,7 @@ class DataFrameReader(OptionUtils):
              allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
              allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
              mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
-             multiLine=None):
+             multiLine=None, allowUnquotedControlChars=None):
         """
         Loads JSON files and returns the results as a :class:`DataFrame`.
 
@@ -234,6 +234,9 @@ class DataFrameReader(OptionUtils):
                                 default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
         :param multiLine: parse one record, which may span multiple lines, per file. If None is
                           set, it uses the default value, ``false``.
+        :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
+                                          characters (ASCII characters with value less than 32,
+                                          including tab and line feed characters) or not.
 
         >>> df1 = spark.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes
@@ -250,7 +253,8 @@ class DataFrameReader(OptionUtils):
             allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
             allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
             mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
-            timestampFormat=timestampFormat, multiLine=multiLine)
+            timestampFormat=timestampFormat, multiLine=multiLine,
+            allowUnquotedControlChars=allowUnquotedControlChars)
         if isinstance(path, basestring):
             path = [path]
         if type(path) == list:
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 5bbd70cf0a789d04e9221d7f4b8cb0fabb5048bf..0cf702143c77305f04a8b4a9184d6472938386e6 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -407,7 +407,7 @@ class DataStreamReader(OptionUtils):
              allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
              allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
              mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
-             multiLine=None):
+             multiLine=None,  allowUnquotedControlChars=None):
         """
         Loads a JSON file stream and returns the results as a :class:`DataFrame`.
 
@@ -467,6 +467,9 @@ class DataStreamReader(OptionUtils):
                                 default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
         :param multiLine: parse one record, which may span multiple lines, per file. If None is
                           set, it uses the default value, ``false``.
+        :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
+                                          characters (ASCII characters with value less than 32,
+                                          including tab and line feed characters) or not.
 
         >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
         >>> json_sdf.isStreaming
@@ -480,7 +483,8 @@ class DataStreamReader(OptionUtils):
             allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
             allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
             mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
-            timestampFormat=timestampFormat, multiLine=multiLine)
+            timestampFormat=timestampFormat, multiLine=multiLine,
+            allowUnquotedControlChars=allowUnquotedControlChars)
         if isinstance(path, basestring):
             return self._df(self._jreader.json(path))
         else:
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 1fd680ab64b5a2d947d77210e76a82a2d55b4bb3..652412b34478ad193a5c90d4d319f22c68986aed 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -64,6 +64,8 @@ private[sql] class JSONOptions(
     parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
   val allowBackslashEscapingAnyCharacter =
     parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
+  private val allowUnquotedControlChars =
+    parameters.get("allowUnquotedControlChars").map(_.toBoolean).getOrElse(false)
   val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
   val parseMode: ParseMode =
     parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode)
@@ -92,5 +94,6 @@ private[sql] class JSONOptions(
     factory.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, allowNonNumericNumbers)
     factory.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER,
       allowBackslashEscapingAnyCharacter)
+    factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 41cb019499ae12613ee3d05475b49424324c6ff1..8209cec4ba0a8498cd52216e1f67ecacd28fc239 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -313,6 +313,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * (e.g. 00012)</li>
    * <li>`allowBackslashEscapingAnyCharacter` (default `false`): allows accepting quoting of all
    * character using backslash quoting mechanism</li>
+   * <li>`allowUnquotedControlChars` (default `false`): allows JSON Strings to contain unquoted
+   * control characters (ASCII characters with value less than 32, including tab and line feed
+   * characters) or not.</li>
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    * during parsing.
    *   <ul>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 70ddfa8e9b83514af3e4c401c79632e8764a2ec1..a42e28053a96af6b20557ef4a21710548fa412af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -195,6 +195,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * (e.g. 00012)</li>
    * <li>`allowBackslashEscapingAnyCharacter` (default `false`): allows accepting quoting of all
    * character using backslash quoting mechanism</li>
+   * <li>`allowUnquotedControlChars` (default `false`): allows JSON Strings to contain unquoted
+   * control characters (ASCII characters with value less than 32, including tab and line feed
+   * characters) or not.</li>
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    * during parsing.
    *   <ul>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
index 6e2b4f0df595fe7485e1f193fafa8d4e748d87da..316c5183fddf19b0f9417d6c54733512416697fb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -72,6 +72,21 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
     assert(df.first().getString(0) == "Reynold Xin")
   }
 
+  test("allowUnquotedControlChars off") {
+    val str = """{"name": "a\u0001b"}"""
+    val df = spark.read.json(Seq(str).toDS())
+
+    assert(df.schema.head.name == "_corrupt_record")
+  }
+
+  test("allowUnquotedControlChars on") {
+    val str = """{"name": "a\u0001b"}"""
+    val df = spark.read.option("allowUnquotedControlChars", "true").json(Seq(str).toDS())
+
+    assert(df.schema.head.name == "name")
+    assert(df.first().getString(0) == "a\u0001b")
+  }
+
   test("allowNumericLeadingZeros off") {
     val str = """{"age": 0018}"""
     val df = spark.read.json(Seq(str).toDS())