From 42de5253f327bd7ee258b0efb5024f3847fa3b51 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Mon, 16 Nov 2015 00:06:14 -0800
Subject: [PATCH] [SPARK-11745][SQL] Enable more JSON parsing options

This patch adds the following options to the JSON data source, for dealing with non-standard JSON files:
* `allowComments` (default `false`): ignores Java/C++ style comment in JSON records
* `allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names
* `allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes
* `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers (e.g. 00012)

To avoid passing a lot of options throughout the json package, I introduced a new JSONOptions case class to define all JSON config options.

Also updated documentation to explain these options.

Scala

![screen shot 2015-11-15 at 6 12 12 pm](https://cloud.githubusercontent.com/assets/323388/11172965/e3ace6ec-8bc4-11e5-805e-2d78f80d0ed6.png)

Python

![screen shot 2015-11-15 at 6 11 28 pm](https://cloud.githubusercontent.com/assets/323388/11172964/e23ed6ee-8bc4-11e5-8216-312f5983acd5.png)

Author: Reynold Xin <rxin@databricks.com>

Closes #9724 from rxin/SPARK-11745.
---
 python/pyspark/sql/readwriter.py              |  10 ++
 .../apache/spark/sql/DataFrameReader.scala    |  22 ++--
 .../spark/sql/execution/SparkPlan.scala       |  17 +--
 .../datasources/json/InferSchema.scala        |  34 +++---
 .../datasources/json/JSONOptions.scala        |  64 ++++++++++
 .../datasources/json/JSONRelation.scala       |  20 ++-
 .../datasources/json/JacksonParser.scala      |  82 +++++++------
 .../json/JsonParsingOptionsSuite.scala        | 114 ++++++++++++++++++
 .../datasources/json/JsonSuite.scala          |  29 ++---
 9 files changed, 286 insertions(+), 106 deletions(-)
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
 create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala

diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 927f407742..7b8ddb9feb 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -153,6 +153,16 @@ class DataFrameReader(object):
                      or RDD of Strings storing JSON objects.
         :param schema: an optional :class:`StructType` for the input schema.
 
+        You can set the following JSON-specific options to deal with non-standard JSON files:
+            * ``primitivesAsString`` (default ``false``): infers all primitive values as a string \
+                type
+            * ``allowComments`` (default ``false``): ignores Java/C++ style comment in JSON records
+            * ``allowUnquotedFieldNames`` (default ``false``): allows unquoted JSON field names
+            * ``allowSingleQuotes`` (default ``true``): allows single quotes in addition to double \
+                quotes
+            * ``allowNumericLeadingZeros`` (default ``false``): allows leading zeros in numbers \
+                (e.g. 00012)
+
         >>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes
         [('age', 'bigint'), ('name', 'string')]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 6a194a443a..5872fbded3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -29,7 +29,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.execution.datasources.json.JSONRelation
+import org.apache.spark.sql.execution.datasources.json.{JSONOptions, JSONRelation}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
 import org.apache.spark.sql.types.StructType
@@ -227,6 +227,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * This function goes through the input once to determine the input schema. If you know the
    * schema in advance, use the version that specifies the schema to avoid the extra scan.
    *
+   * You can set the following JSON-specific options to deal with non-standard JSON files:
+   * <li>`primitivesAsString` (default `false`): infers all primitive values as a string type</li>
+   * <li>`allowComments` (default `false`): ignores Java/C++ style comment in JSON records</li>
+   * <li>`allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names</li>
+   * <li>`allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes
+   * </li>
+   * <li>`allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers
+   * (e.g. 00012)</li>
+   *
    * @param path input path
    * @since 1.4.0
    */
@@ -255,16 +264,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * @since 1.4.0
    */
   def json(jsonRDD: RDD[String]): DataFrame = {
-    val samplingRatio = extraOptions.getOrElse("samplingRatio", "1.0").toDouble
-    val primitivesAsString = extraOptions.getOrElse("primitivesAsString", "false").toBoolean
     sqlContext.baseRelationToDataFrame(
       new JSONRelation(
         Some(jsonRDD),
-        samplingRatio,
-        primitivesAsString,
-        userSpecifiedSchema,
-        None,
-        None)(sqlContext)
+        maybeDataSchema = userSpecifiedSchema,
+        maybePartitionSpec = None,
+        userDefinedPartitionColumns = None,
+        parameters = extraOptions.toMap)(sqlContext)
     )
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 1b833002f4..534a3bcb83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -221,22 +221,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
 
   private[this] def isTesting: Boolean = sys.props.contains("spark.testing")
 
-  protected def newProjection(
-      expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
-    log.debug(s"Creating Projection: $expressions, inputSchema: $inputSchema")
-    try {
-      GenerateProjection.generate(expressions, inputSchema)
-    } catch {
-      case e: Exception =>
-        if (isTesting) {
-          throw e
-        } else {
-          log.error("Failed to generate projection, fallback to interpret", e)
-          new InterpretedProjection(expressions, inputSchema)
-        }
-    }
-  }
-
   protected def newMutableProjection(
       expressions: Seq[Expression], inputSchema: Seq[Attribute]): () => MutableProjection = {
     log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
@@ -282,6 +266,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
         }
     }
   }
+
   /**
    * Creates a row ordering for the given schema, in natural ascending order.
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index b9914c581a..922fd5b211 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -25,33 +25,36 @@ import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
-private[sql] object InferSchema {
+
+private[json] object InferSchema {
+
   /**
    * Infer the type of a collection of json records in three stages:
    *   1. Infer the type of each record
    *   2. Merge types by choosing the lowest type necessary to cover equal keys
    *   3. Replace any remaining null fields with string, the top type
    */
-  def apply(
+  def infer(
       json: RDD[String],
-      samplingRatio: Double = 1.0,
       columnNameOfCorruptRecords: String,
-      primitivesAsString: Boolean = false): StructType = {
-    require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
-    val schemaData = if (samplingRatio > 0.99) {
+      configOptions: JSONOptions): StructType = {
+    require(configOptions.samplingRatio > 0,
+      s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0")
+    val schemaData = if (configOptions.samplingRatio > 0.99) {
       json
     } else {
-      json.sample(withReplacement = false, samplingRatio, 1)
+      json.sample(withReplacement = false, configOptions.samplingRatio, 1)
     }
 
     // perform schema inference on each row and merge afterwards
     val rootType = schemaData.mapPartitions { iter =>
       val factory = new JsonFactory()
+      configOptions.setJacksonOptions(factory)
       iter.map { row =>
         try {
           Utils.tryWithResource(factory.createParser(row)) { parser =>
             parser.nextToken()
-            inferField(parser, primitivesAsString)
+            inferField(parser, configOptions)
           }
         } catch {
           case _: JsonParseException =>
@@ -71,14 +74,14 @@ private[sql] object InferSchema {
   /**
    * Infer the type of a json document from the parser's token stream
    */
-  private def inferField(parser: JsonParser, primitivesAsString: Boolean): DataType = {
+  private def inferField(parser: JsonParser, configOptions: JSONOptions): DataType = {
     import com.fasterxml.jackson.core.JsonToken._
     parser.getCurrentToken match {
       case null | VALUE_NULL => NullType
 
       case FIELD_NAME =>
         parser.nextToken()
-        inferField(parser, primitivesAsString)
+        inferField(parser, configOptions)
 
       case VALUE_STRING if parser.getTextLength < 1 =>
         // Zero length strings and nulls have special handling to deal
@@ -95,7 +98,7 @@ private[sql] object InferSchema {
         while (nextUntil(parser, END_OBJECT)) {
           builder += StructField(
             parser.getCurrentName,
-            inferField(parser, primitivesAsString),
+            inferField(parser, configOptions),
             nullable = true)
         }
 
@@ -107,14 +110,15 @@ private[sql] object InferSchema {
         // the type as we pass through all JSON objects.
         var elementType: DataType = NullType
         while (nextUntil(parser, END_ARRAY)) {
-          elementType = compatibleType(elementType, inferField(parser, primitivesAsString))
+          elementType = compatibleType(
+            elementType, inferField(parser, configOptions))
         }
 
         ArrayType(elementType)
 
-      case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) if primitivesAsString => StringType
+      case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) if configOptions.primitivesAsString => StringType
 
-      case (VALUE_TRUE | VALUE_FALSE) if primitivesAsString => StringType
+      case (VALUE_TRUE | VALUE_FALSE) if configOptions.primitivesAsString => StringType
 
       case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
         import JsonParser.NumberType._
@@ -178,7 +182,7 @@ private[sql] object InferSchema {
   /**
    * Returns the most general data type for two given data types.
    */
-  private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
+  def compatibleType(t1: DataType, t2: DataType): DataType = {
     HiveTypeCoercion.findTightestCommonTypeOfTwo(t1, t2).getOrElse {
       // t1 or t2 is a StructType, ArrayType, or an unexpected type.
       (t1, t2) match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
new file mode 100644
index 0000000000..c132ead20e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import com.fasterxml.jackson.core.{JsonParser, JsonFactory}
+
+/**
+ * Options for the JSON data source.
+ *
+ * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
+ */
+case class JSONOptions(
+    samplingRatio: Double = 1.0,
+    primitivesAsString: Boolean = false,
+    allowComments: Boolean = false,
+    allowUnquotedFieldNames: Boolean = false,
+    allowSingleQuotes: Boolean = true,
+    allowNumericLeadingZeros: Boolean = false,
+    allowNonNumericNumbers: Boolean = false) {
+
+  /** Sets config options on a Jackson [[JsonFactory]]. */
+  def setJacksonOptions(factory: JsonFactory): Unit = {
+    factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
+    factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, allowUnquotedFieldNames)
+    factory.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, allowSingleQuotes)
+    factory.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, allowNumericLeadingZeros)
+    factory.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, allowNonNumericNumbers)
+  }
+}
+
+
+object JSONOptions {
+  def createFromConfigMap(parameters: Map[String, String]): JSONOptions = JSONOptions(
+    samplingRatio =
+      parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0),
+    primitivesAsString =
+      parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false),
+    allowComments =
+      parameters.get("allowComments").map(_.toBoolean).getOrElse(false),
+    allowUnquotedFieldNames =
+      parameters.get("allowUnquotedFieldNames").map(_.toBoolean).getOrElse(false),
+    allowSingleQuotes =
+      parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(true),
+    allowNumericLeadingZeros =
+      parameters.get("allowNumericLeadingZeros").map(_.toBoolean).getOrElse(false),
+    allowNonNumericNumbers =
+      parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
+  )
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index dca638b7f6..3e61ba35be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -52,13 +52,9 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
       dataSchema: Option[StructType],
       partitionColumns: Option[StructType],
       parameters: Map[String, String]): HadoopFsRelation = {
-    val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
-    val primitivesAsString = parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
 
     new JSONRelation(
       inputRDD = None,
-      samplingRatio = samplingRatio,
-      primitivesAsString = primitivesAsString,
       maybeDataSchema = dataSchema,
       maybePartitionSpec = None,
       userDefinedPartitionColumns = partitionColumns,
@@ -69,8 +65,6 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
 
 private[sql] class JSONRelation(
     val inputRDD: Option[RDD[String]],
-    val samplingRatio: Double,
-    val primitivesAsString: Boolean,
     val maybeDataSchema: Option[StructType],
     val maybePartitionSpec: Option[PartitionSpec],
     override val userDefinedPartitionColumns: Option[StructType],
@@ -79,6 +73,8 @@ private[sql] class JSONRelation(
     (@transient val sqlContext: SQLContext)
   extends HadoopFsRelation(maybePartitionSpec, parameters) {
 
+  val options: JSONOptions = JSONOptions.createFromConfigMap(parameters)
+
   /** Constraints to be imposed on schema to be stored. */
   private def checkConstraints(schema: StructType): Unit = {
     if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
@@ -109,17 +105,16 @@ private[sql] class JSONRelation(
       classOf[Text]).map(_._2.toString) // get the text line
   }
 
-  override lazy val dataSchema = {
+  override lazy val dataSchema: StructType = {
     val jsonSchema = maybeDataSchema.getOrElse {
       val files = cachedLeafStatuses().filterNot { status =>
         val name = status.getPath.getName
         name.startsWith("_") || name.startsWith(".")
       }.toArray
-      InferSchema(
+      InferSchema.infer(
         inputRDD.getOrElse(createBaseRdd(files)),
-        samplingRatio,
         sqlContext.conf.columnNameOfCorruptRecord,
-        primitivesAsString)
+        options)
     }
     checkConstraints(jsonSchema)
 
@@ -132,10 +127,11 @@ private[sql] class JSONRelation(
       inputPaths: Array[FileStatus],
       broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
     val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_)))
-    val rows = JacksonParser(
+    val rows = JacksonParser.parse(
       inputRDD.getOrElse(createBaseRdd(inputPaths)),
       requiredDataSchema,
-      sqlContext.conf.columnNameOfCorruptRecord)
+      sqlContext.conf.columnNameOfCorruptRecord,
+      options)
 
     rows.mapPartitions { iterator =>
       val unsafeProjection = UnsafeProjection.create(requiredDataSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 4f53eeb081..bfa1405041 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -18,11 +18,10 @@
 package org.apache.spark.sql.execution.datasources.json
 
 import java.io.ByteArrayOutputStream
+import scala.collection.mutable.ArrayBuffer
 
 import com.fasterxml.jackson.core._
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -32,18 +31,23 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
-private[sql] object JacksonParser {
-  def apply(
-      json: RDD[String],
+object JacksonParser {
+
+  def parse(
+      input: RDD[String],
       schema: StructType,
-      columnNameOfCorruptRecords: String): RDD[InternalRow] = {
-    parseJson(json, schema, columnNameOfCorruptRecords)
+      columnNameOfCorruptRecords: String,
+      configOptions: JSONOptions): RDD[InternalRow] = {
+
+    input.mapPartitions { iter =>
+      parseJson(iter, schema, columnNameOfCorruptRecords, configOptions)
+    }
   }
 
   /**
    * Parse the current token (and related children) according to a desired schema
    */
-  private[sql] def convertField(
+  def convertField(
       factory: JsonFactory,
       parser: JsonParser,
       schema: DataType): Any = {
@@ -226,9 +230,10 @@ private[sql] object JacksonParser {
   }
 
   private def parseJson(
-      json: RDD[String],
+      input: Iterator[String],
       schema: StructType,
-      columnNameOfCorruptRecords: String): RDD[InternalRow] = {
+      columnNameOfCorruptRecords: String,
+      configOptions: JSONOptions): Iterator[InternalRow] = {
 
     def failedRecord(record: String): Seq[InternalRow] = {
       // create a row even if no corrupt record column is present
@@ -241,37 +246,36 @@ private[sql] object JacksonParser {
       Seq(row)
     }
 
-    json.mapPartitions { iter =>
-      val factory = new JsonFactory()
-
-      iter.flatMap { record =>
-        if (record.trim.isEmpty) {
-          Nil
-        } else {
-          try {
-            Utils.tryWithResource(factory.createParser(record)) { parser =>
-              parser.nextToken()
-
-              convertField(factory, parser, schema) match {
-                case null => failedRecord(record)
-                case row: InternalRow => row :: Nil
-                case array: ArrayData =>
-                  if (array.numElements() == 0) {
-                    Nil
-                  } else {
-                    array.toArray[InternalRow](schema)
-                  }
-                case _ =>
-                  sys.error(
-                    s"Failed to parse record $record. Please make sure that each line of " +
-                      "the file (or each string in the RDD) is a valid JSON object or " +
-                      "an array of JSON objects.")
-              }
+    val factory = new JsonFactory()
+    configOptions.setJacksonOptions(factory)
+
+    input.flatMap { record =>
+      if (record.trim.isEmpty) {
+        Nil
+      } else {
+        try {
+          Utils.tryWithResource(factory.createParser(record)) { parser =>
+            parser.nextToken()
+
+            convertField(factory, parser, schema) match {
+              case null => failedRecord(record)
+              case row: InternalRow => row :: Nil
+              case array: ArrayData =>
+                if (array.numElements() == 0) {
+                  Nil
+                } else {
+                  array.toArray[InternalRow](schema)
+                }
+              case _ =>
+                sys.error(
+                  s"Failed to parse record $record. Please make sure that each line of " +
+                    "the file (or each string in the RDD) is a valid JSON object or " +
+                    "an array of JSON objects.")
             }
-          } catch {
-            case _: JsonProcessingException =>
-              failedRecord(record)
           }
+        } catch {
+          case _: JsonProcessingException =>
+            failedRecord(record)
         }
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
new file mode 100644
index 0000000000..4cc0a3a958
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+/**
+ * Test cases for various [[JSONOptions]].
+ */
+class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
+
+  test("allowComments off") {
+    val str = """{'name': /* hello */ 'Reynold Xin'}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.json(rdd)
+
+    assert(df.schema.head.name == "_corrupt_record")
+  }
+
+  test("allowComments on") {
+    val str = """{'name': /* hello */ 'Reynold Xin'}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.option("allowComments", "true").json(rdd)
+
+    assert(df.schema.head.name == "name")
+    assert(df.first().getString(0) == "Reynold Xin")
+  }
+
+  test("allowSingleQuotes off") {
+    val str = """{'name': 'Reynold Xin'}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.option("allowSingleQuotes", "false").json(rdd)
+
+    assert(df.schema.head.name == "_corrupt_record")
+  }
+
+  test("allowSingleQuotes on") {
+    val str = """{'name': 'Reynold Xin'}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.json(rdd)
+
+    assert(df.schema.head.name == "name")
+    assert(df.first().getString(0) == "Reynold Xin")
+  }
+
+  test("allowUnquotedFieldNames off") {
+    val str = """{name: 'Reynold Xin'}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.json(rdd)
+
+    assert(df.schema.head.name == "_corrupt_record")
+  }
+
+  test("allowUnquotedFieldNames on") {
+    val str = """{name: 'Reynold Xin'}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.option("allowUnquotedFieldNames", "true").json(rdd)
+
+    assert(df.schema.head.name == "name")
+    assert(df.first().getString(0) == "Reynold Xin")
+  }
+
+  test("allowNumericLeadingZeros off") {
+    val str = """{"age": 0018}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.json(rdd)
+
+    assert(df.schema.head.name == "_corrupt_record")
+  }
+
+  test("allowNumericLeadingZeros on") {
+    val str = """{"age": 0018}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.option("allowNumericLeadingZeros", "true").json(rdd)
+
+    assert(df.schema.head.name == "age")
+    assert(df.first().getLong(0) == 18)
+  }
+
+  // The following two tests are not really working - need to look into Jackson's
+  // JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS.
+  ignore("allowNonNumericNumbers off") {
+    val str = """{"age": NaN}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.json(rdd)
+
+    assert(df.schema.head.name == "_corrupt_record")
+  }
+
+  ignore("allowNonNumericNumbers on") {
+    val str = """{"age": NaN}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.option("allowNonNumericNumbers", "true").json(rdd)
+
+    assert(df.schema.head.name == "age")
+    assert(df.first().getDouble(0).isNaN)
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 28b8f02bdf..6042b1178a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -588,7 +588,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       relation.isInstanceOf[JSONRelation],
       "The DataFrame returned by jsonFile should be based on JSONRelation.")
     assert(relation.asInstanceOf[JSONRelation].paths === Array(path))
-    assert(relation.asInstanceOf[JSONRelation].samplingRatio === (0.49 +- 0.001))
+    assert(relation.asInstanceOf[JSONRelation].options.samplingRatio === (0.49 +- 0.001))
 
     val schema = StructType(StructField("a", LongType, true) :: Nil)
     val logicalRelation =
@@ -597,7 +597,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     val relationWithSchema = logicalRelation.relation.asInstanceOf[JSONRelation]
     assert(relationWithSchema.paths === Array(path))
     assert(relationWithSchema.schema === schema)
-    assert(relationWithSchema.samplingRatio > 0.99)
+    assert(relationWithSchema.options.samplingRatio > 0.99)
   }
 
   test("Loading a JSON dataset from a text file") {
@@ -1165,31 +1165,28 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   test("JSONRelation equality test") {
     val relation0 = new JSONRelation(
       Some(empty),
-      1.0,
-      false,
       Some(StructType(StructField("a", IntegerType, true) :: Nil)),
-      None, None)(sqlContext)
+      None,
+      None)(sqlContext)
     val logicalRelation0 = LogicalRelation(relation0)
     val relation1 = new JSONRelation(
       Some(singleRow),
-      1.0,
-      false,
       Some(StructType(StructField("a", IntegerType, true) :: Nil)),
-      None, None)(sqlContext)
+      None,
+      None)(sqlContext)
     val logicalRelation1 = LogicalRelation(relation1)
     val relation2 = new JSONRelation(
       Some(singleRow),
-      0.5,
-      false,
       Some(StructType(StructField("a", IntegerType, true) :: Nil)),
-      None, None)(sqlContext)
+      None,
+      None,
+      parameters = Map("samplingRatio" -> "0.5"))(sqlContext)
     val logicalRelation2 = LogicalRelation(relation2)
     val relation3 = new JSONRelation(
       Some(singleRow),
-      1.0,
-      false,
       Some(StructType(StructField("b", IntegerType, true) :: Nil)),
-      None, None)(sqlContext)
+      None,
+      None)(sqlContext)
     val logicalRelation3 = LogicalRelation(relation3)
 
     assert(relation0 !== relation1)
@@ -1232,7 +1229,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
 
   test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
     // This is really a test that it doesn't throw an exception
-    val emptySchema = InferSchema(empty, 1.0, "")
+    val emptySchema = InferSchema.infer(empty, "", JSONOptions())
     assert(StructType(Seq()) === emptySchema)
   }
 
@@ -1256,7 +1253,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("SPARK-8093 Erase empty structs") {
-    val emptySchema = InferSchema(emptyRecords, 1.0, "")
+    val emptySchema = InferSchema.infer(emptyRecords, "", JSONOptions())
     assert(StructType(Seq()) === emptySchema)
   }
 
-- 
GitLab