From fe33121a53384811a8e094ab6c05dc85b7c7ca87 Mon Sep 17 00:00:00 2001
From: Michael Armbrust <michael@databricks.com>
Date: Thu, 29 Sep 2016 13:01:10 -0700
Subject: [PATCH] [SPARK-17699] Support for parsing JSON string columns

Spark SQL has great support for reading text files that contain JSON data.  However, in many cases the JSON data is just one column amongst others.  This is particularly true when reading from sources such as Kafka.  This PR adds a new functions `from_json` that converts a string column into a nested `StructType` with a user specified schema.

Example usage:
```scala
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("a", IntegerType)

df.select(from_json($"value", schema) as 'json) // => [json: <a: int>]
```

This PR adds support for java, scala and python.  I leveraged our existing JSON parsing support by moving it into catalyst (so that we could define expressions using it).  I left SQL out for now, because I'm not sure how users would specify a schema.

Author: Michael Armbrust <michael@databricks.com>

Closes #15274 from marmbrus/jsonParser.
---
 python/pyspark/sql/functions.py               | 23 ++++++++
 .../expressions/jsonExpressions.scala         | 31 +++++++++-
 .../sql/catalyst}/json/JSONOptions.scala      |  6 +-
 .../sql/catalyst}/json/JacksonParser.scala    | 13 +++--
 .../sql/catalyst}/json/JacksonUtils.scala     |  4 +-
 .../catalyst/util}/CompressionCodecs.scala    |  6 +-
 .../spark/sql/catalyst/util}/ParseModes.scala |  4 +-
 .../expressions/JsonExpressionsSuite.scala    | 26 +++++++++
 .../apache/spark/sql/DataFrameReader.scala    |  5 +-
 .../datasources/csv/CSVFileFormat.scala       |  1 +
 .../datasources/csv/CSVOptions.scala          |  2 +-
 .../datasources/json/InferSchema.scala        |  3 +-
 .../datasources/json/JacksonGenerator.scala   |  3 +-
 .../datasources/json/JsonFileFormat.scala     |  2 +
 .../datasources/text/TextFileFormat.scala     |  1 +
 .../org/apache/spark/sql/functions.scala      | 58 +++++++++++++++++++
 .../apache/spark/sql/JsonFunctionsSuite.scala | 29 ++++++++++
 .../json/JsonParsingOptionsSuite.scala        |  1 +
 .../datasources/json/JsonSuite.scala          |  3 +-
 19 files changed, 198 insertions(+), 23 deletions(-)
 rename sql/{core/src/main/scala/org/apache/spark/sql/execution/datasources => catalyst/src/main/scala/org/apache/spark/sql/catalyst}/json/JSONOptions.scala (95%)
 rename sql/{core/src/main/scala/org/apache/spark/sql/execution/datasources => catalyst/src/main/scala/org/apache/spark/sql/catalyst}/json/JacksonParser.scala (97%)
 rename sql/{core/src/main/scala/org/apache/spark/sql/execution/datasources => catalyst/src/main/scala/org/apache/spark/sql/catalyst}/json/JacksonUtils.scala (92%)
 rename sql/{core/src/main/scala/org/apache/spark/sql/execution/datasources => catalyst/src/main/scala/org/apache/spark/sql/catalyst/util}/CompressionCodecs.scala (93%)
 rename sql/{core/src/main/scala/org/apache/spark/sql/execution/datasources => catalyst/src/main/scala/org/apache/spark/sql/catalyst/util}/ParseModes.scala (94%)

diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 89b3c07c07..45d6bf944b 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1706,6 +1706,29 @@ def json_tuple(col, *fields):
     return Column(jc)
 
 
+@since(2.1)
+def from_json(col, schema, options={}):
+    """
+    Parses a column containing a JSON string into a [[StructType]] with the
+    specified schema. Returns `null`, in the case of an unparseable string.
+
+    :param col: string column in json format
+    :param schema: a StructType to use when parsing the json column
+    :param options: options to control parsing. accepts the same options as the json datasource
+
+    >>> from pyspark.sql.types import *
+    >>> data = [(1, '''{"a": 1}''')]
+    >>> schema = StructType([StructField("a", IntegerType())])
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> df.select(from_json(df.value, schema).alias("json")).collect()
+    [Row(json=Row(a=1))]
+    """
+
+    sc = SparkContext._active_spark_context
+    jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), options)
+    return Column(jc)
+
+
 @since(1.5)
 def size(col):
     """
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c14a2fb122..65dbd6a4e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -23,10 +23,12 @@ import scala.util.parsing.combinator.RegexParsers
 
 import com.fasterxml.jackson.core._
 
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
+import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
@@ -467,3 +469,28 @@ case class JsonTuple(children: Seq[Expression])
   }
 }
 
+/**
+ * Converts an json input string to a [[StructType]] with the specified schema.
+ */
+case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
+  extends Expression with CodegenFallback with ExpectsInputTypes {
+  override def nullable: Boolean = true
+
+  @transient
+  lazy val parser =
+    new JacksonParser(
+      schema,
+      "invalid", // Not used since we force fail fast.  Invalid rows will be set to `null`.
+      new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
+
+  override def dataType: DataType = schema
+  override def children: Seq[Expression] = child :: Nil
+
+  override def eval(input: InternalRow): Any = {
+    try parser.parse(child.eval(input).toString).head catch {
+      case _: SparkSQLJsonProcessingException => null
+    }
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
similarity index 95%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 02d211d042..aec18922ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources.json
+package org.apache.spark.sql.catalyst.json
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
 import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
 
 /**
- * Options for the JSON data source.
+ * Options for parsing JSON data into Spark SQL rows.
  *
  * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
  */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
similarity index 97%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 5ce1bf7432..f80e6373d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources.json
+package org.apache.spark.sql.catalyst.json
 
 import java.io.ByteArrayOutputStream
 
@@ -28,19 +28,22 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
-import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
-private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
+private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
 
+/**
+ * Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
+ */
 class JacksonParser(
     schema: StructType,
     columnNameOfCorruptRecord: String,
     options: JSONOptions) extends Logging {
 
+  import JacksonUtils._
+  import ParseModes._
   import com.fasterxml.jackson.core.JsonToken._
 
   // A `ValueConverter` is responsible for converting a value from `JsonParser`
@@ -65,7 +68,7 @@ class JacksonParser(
   private def failedRecord(record: String): Seq[InternalRow] = {
     // create a row even if no corrupt record column is present
     if (options.failFast) {
-      throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
+      throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record")
     }
     if (options.dropMalformed) {
       if (!isWarningPrintedForMalformedRecord) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
similarity index 92%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
index 005546f37d..c4d9abb2c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources.json
+package org.apache.spark.sql.catalyst.json
 
 import com.fasterxml.jackson.core.{JsonParser, JsonToken}
 
-private object JacksonUtils {
+object JacksonUtils {
   /**
    * Advance the parser until a null or a specific token is found
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
similarity index 93%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
index 41cff07472..435fba9d88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}
+import org.apache.hadoop.io.compress._
 
 import org.apache.spark.util.Utils
 
-private[datasources] object CompressionCodecs {
+object CompressionCodecs {
   private val shortCompressionCodecNames = Map(
     "none" -> null,
     "uncompressed" -> null,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
similarity index 94%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
index 468228053c..0e466962b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
-private[datasources] object ParseModes {
+object ParseModes {
   val PERMISSIVE_MODE = "PERMISSIVE"
   val DROP_MALFORMED_MODE = "DROPMALFORMED"
   val FAIL_FAST_MODE = "FAILFAST"
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 7b754091f4..84623934d9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -317,4 +319,28 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
       InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
   }
+
+  test("from_json") {
+    val jsonData = """{"a": 1}"""
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    checkEvaluation(
+      JsonToStruct(schema, Map.empty, Literal(jsonData)),
+      InternalRow.fromSeq(1 :: Nil)
+    )
+  }
+
+  test("from_json - invalid data") {
+    val jsonData = """{"a" 1}"""
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    checkEvaluation(
+      JsonToStruct(schema, Map.empty, Literal(jsonData)),
+      null
+    )
+
+    // Other modes should still return `null`.
+    checkEvaluation(
+      JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)),
+      null
+    )
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b10d2c86ac..b84fb2fb95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -21,14 +21,15 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.Partition
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.Logging
+import org.apache.spark.Partition
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.json.InferSchema
 import org.apache.spark.sql.types.StructType
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 9610746a81..4e662a52a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -29,6 +29,7 @@ import org.apache.spark.TaskContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index e7dcc22272..014614eb99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets
 import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
 
 private[csv] class CSVOptions(@transient private val parameters: Map[String, String])
   extends Logging with Serializable {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 91c58d059d..dc8bd817f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -23,7 +23,8 @@ import com.fasterxml.jackson.core._
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
-import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.catalyst.json.JSONOptions
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 270e7fbd3c..5b55b70186 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -21,8 +21,9 @@ import java.io.Writer
 
 import com.fasterxml.jackson.core._
 
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.JSONOptions
 import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
 import org.apache.spark.sql.types._
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 6882a6cdca..9fe38ccc9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -32,6 +32,8 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index a875b01ec2..9f96667311 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 47bf41a2da..3bc1c5b900 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.{typeTag, TypeTag}
 import scala.util.Try
@@ -2818,6 +2819,63 @@ object functions {
     JsonTuple(json.expr +: fields.map(Literal.apply))
   }
 
+  /**
+   * (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param schema the schema to use when parsing the json string
+   * @param options options to control how the json is parsed. accepts the same options and the
+   *                json data source.
+   * @param e a string column containing JSON data.
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+    JsonToStruct(schema, options, e.expr)
+  }
+
+  /**
+   * (Java-specific) Parses a column containing a JSON string into a [[StructType]] with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string
+   * @param options options to control how the json is parsed. accepts the same options and the
+   *                json data source.
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def from_json(e: Column, schema: StructType, options: java.util.Map[String, String]): Column =
+    from_json(e, schema, options.asScala.toMap)
+
+  /**
+   * Parses a column containing a JSON string into a [[StructType]] with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def from_json(e: Column, schema: StructType): Column =
+    from_json(e, schema, Map.empty[String, String])
+
+  /**
+   * Parses a column containing a JSON string into a [[StructType]] with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string as a json string
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
+    from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)
+
   /**
    * Returns length of array or map.
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 1391c9d57f..518d6e92b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.functions.from_json
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
 
 class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -94,4 +96,31 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
 
     checkAnswer(expr, expected)
   }
+
+  test("json_parser") {
+    val df = Seq("""{"a": 1}""").toDS()
+    val schema = new StructType().add("a", IntegerType)
+
+    checkAnswer(
+      df.select(from_json($"value", schema)),
+      Row(Row(1)) :: Nil)
+  }
+
+  test("json_parser missing columns") {
+    val df = Seq("""{"a": 1}""").toDS()
+    val schema = new StructType().add("b", IntegerType)
+
+    checkAnswer(
+      df.select(from_json($"value", schema)),
+      Row(Row(null)) :: Nil)
+  }
+
+  test("json_parser invalid json") {
+    val df = Seq("""{"a" 1}""").toDS()
+    val schema = new StructType().add("a", IntegerType)
+
+    checkAnswer(
+      df.select(from_json($"value", schema)),
+      Row(null) :: Nil)
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
index c31dffedbd..0b72da5f37 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.json
 
 import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.json.JSONOptions
 import org.apache.spark.sql.test.SharedSQLContext
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 3d533c14e1..456052f79a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -26,9 +26,10 @@ import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
-import org.apache.spark.SparkException
 import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkException
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
-- 
GitLab