diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 89b3c07c0740f14c3f950a32b16f4ca584785dba..45d6bf944b702e7d4b585ffc1f800ed58fd2bf2d 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1706,6 +1706,29 @@ def json_tuple(col, *fields):
     return Column(jc)
 
 
+@since(2.1)
+def from_json(col, schema, options={}):
+    """
+    Parses a column containing a JSON string into a [[StructType]] with the
+    specified schema. Returns `null`, in the case of an unparseable string.
+
+    :param col: string column in json format
+    :param schema: a StructType to use when parsing the json column
+    :param options: options to control parsing. accepts the same options as the json datasource
+
+    >>> from pyspark.sql.types import *
+    >>> data = [(1, '''{"a": 1}''')]
+    >>> schema = StructType([StructField("a", IntegerType())])
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> df.select(from_json(df.value, schema).alias("json")).collect()
+    [Row(json=Row(a=1))]
+    """
+
+    sc = SparkContext._active_spark_context
+    jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), options)
+    return Column(jc)
+
+
 @since(1.5)
 def size(col):
     """
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c14a2fb122618613f9f49fd406411ffa590a0ab3..65dbd6a4e3f1da72dd56cae2e7166a751f005992 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -23,10 +23,12 @@ import scala.util.parsing.combinator.RegexParsers
 
 import com.fasterxml.jackson.core._
 
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
+import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
@@ -467,3 +469,28 @@ case class JsonTuple(children: Seq[Expression])
   }
 }
 
+/**
+ * Converts an json input string to a [[StructType]] with the specified schema.
+ */
+case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
+  extends Expression with CodegenFallback with ExpectsInputTypes {
+  override def nullable: Boolean = true
+
+  @transient
+  lazy val parser =
+    new JacksonParser(
+      schema,
+      "invalid", // Not used since we force fail fast.  Invalid rows will be set to `null`.
+      new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
+
+  override def dataType: DataType = schema
+  override def children: Seq[Expression] = child :: Nil
+
+  override def eval(input: InternalRow): Any = {
+    try parser.parse(child.eval(input).toString).head catch {
+      case _: SparkSQLJsonProcessingException => null
+    }
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
similarity index 95%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 02d211d04265e96bc3b11684041688e8d7e5505e..aec18922ea6c8a7477595e0167d65aeccd498f58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources.json
+package org.apache.spark.sql.catalyst.json
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
 import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
 
 /**
- * Options for the JSON data source.
+ * Options for parsing JSON data into Spark SQL rows.
  *
  * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
  */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
similarity index 97%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 5ce1bf7432159df1cbf025be2f4b550d3c036e20..f80e6373d2f89efd7d5a4443780f1890f31e369f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources.json
+package org.apache.spark.sql.catalyst.json
 
 import java.io.ByteArrayOutputStream
 
@@ -28,19 +28,22 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
-import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
-private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
+private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
 
+/**
+ * Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
+ */
 class JacksonParser(
     schema: StructType,
     columnNameOfCorruptRecord: String,
     options: JSONOptions) extends Logging {
 
+  import JacksonUtils._
+  import ParseModes._
   import com.fasterxml.jackson.core.JsonToken._
 
   // A `ValueConverter` is responsible for converting a value from `JsonParser`
@@ -65,7 +68,7 @@ class JacksonParser(
   private def failedRecord(record: String): Seq[InternalRow] = {
     // create a row even if no corrupt record column is present
     if (options.failFast) {
-      throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
+      throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record")
     }
     if (options.dropMalformed) {
       if (!isWarningPrintedForMalformedRecord) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
similarity index 92%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
index 005546f37dda023d237d9e83f52da0993ff60e41..c4d9abb2c07e899ce4ea493d51eea3c0cb0c17e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources.json
+package org.apache.spark.sql.catalyst.json
 
 import com.fasterxml.jackson.core.{JsonParser, JsonToken}
 
-private object JacksonUtils {
+object JacksonUtils {
   /**
    * Advance the parser until a null or a specific token is found
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
similarity index 93%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
index 41cff07472d1e9f3b0f69efaa092117d30400c1e..435fba9d8851c7230498ae84f3c725efc757cdcd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}
+import org.apache.hadoop.io.compress._
 
 import org.apache.spark.util.Utils
 
-private[datasources] object CompressionCodecs {
+object CompressionCodecs {
   private val shortCompressionCodecNames = Map(
     "none" -> null,
     "uncompressed" -> null,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
similarity index 94%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
index 468228053c9649a66265be9e73e9e4706e670dba..0e466962b46785ff9a498bc83c80f75ab9440d46 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
 
-private[datasources] object ParseModes {
+object ParseModes {
   val PERMISSIVE_MODE = "PERMISSIVE"
   val DROP_MALFORMED_MODE = "DROPMALFORMED"
   val FAIL_FAST_MODE = "FAILFAST"
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 7b754091f47146f8a46bbf4181cc71ee053c2bd4..84623934d95d27eafc17899123b8ecea67a31b2d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
 class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -317,4 +319,28 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
       InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
   }
+
+  test("from_json") {
+    val jsonData = """{"a": 1}"""
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    checkEvaluation(
+      JsonToStruct(schema, Map.empty, Literal(jsonData)),
+      InternalRow.fromSeq(1 :: Nil)
+    )
+  }
+
+  test("from_json - invalid data") {
+    val jsonData = """{"a" 1}"""
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    checkEvaluation(
+      JsonToStruct(schema, Map.empty, Literal(jsonData)),
+      null
+    )
+
+    // Other modes should still return `null`.
+    checkEvaluation(
+      JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)),
+      null
+    )
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b10d2c86ac5ef3cd6d69d5658e8e670d719a7d11..b84fb2fb95914727253c746a27ed931f574b6082 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -21,14 +21,15 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.Partition
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.Logging
+import org.apache.spark.Partition
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.json.InferSchema
 import org.apache.spark.sql.types.StructType
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 9610746a81ef73549c49b3532484da2d7fe564e4..4e662a52a7bb796315a6eeee90e95810692ebb48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -29,6 +29,7 @@ import org.apache.spark.TaskContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index e7dcc22272192f50f6ae9c13df69485bf277deca..014614eb997a5662fdcbc3597909a88c382ccfe8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets
 import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
 
 private[csv] class CSVOptions(@transient private val parameters: Map[String, String])
   extends Logging with Serializable {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 91c58d059d2878c8bb05dc9ee2c803888cc6c5b0..dc8bd817f2906453471ed76809aaaff4b8232c9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -23,7 +23,8 @@ import com.fasterxml.jackson.core._
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
-import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.catalyst.json.JSONOptions
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 270e7fbd3c137bd496e622e26c9d91b8c28fe2cb..5b55b701862b7296d6ac45a2c55f32dfd8f28bf8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -21,8 +21,9 @@ import java.io.Writer
 
 import com.fasterxml.jackson.core._
 
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.JSONOptions
 import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
 import org.apache.spark.sql.types._
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 6882a6cdcac26eb3ebe11a945bfbb1ee6da2b877..9fe38ccc9fdc6e43a8427f4f4f1d084bc296e6fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -32,6 +32,8 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index a875b01ec2d7a8d78d5c289d592e6c5254bd1cb9..9f966673110155164a76d126786d8901a4448d30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 47bf41a2da813377ecfebf2ca622f59625922068..3bc1c5b90031d56fe84a2b6ff27399e3c75af3aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.{typeTag, TypeTag}
 import scala.util.Try
@@ -2818,6 +2819,63 @@ object functions {
     JsonTuple(json.expr +: fields.map(Literal.apply))
   }
 
+  /**
+   * (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param schema the schema to use when parsing the json string
+   * @param options options to control how the json is parsed. accepts the same options and the
+   *                json data source.
+   * @param e a string column containing JSON data.
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+    JsonToStruct(schema, options, e.expr)
+  }
+
+  /**
+   * (Java-specific) Parses a column containing a JSON string into a [[StructType]] with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string
+   * @param options options to control how the json is parsed. accepts the same options and the
+   *                json data source.
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def from_json(e: Column, schema: StructType, options: java.util.Map[String, String]): Column =
+    from_json(e, schema, options.asScala.toMap)
+
+  /**
+   * Parses a column containing a JSON string into a [[StructType]] with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def from_json(e: Column, schema: StructType): Column =
+    from_json(e, schema, Map.empty[String, String])
+
+  /**
+   * Parses a column containing a JSON string into a [[StructType]] with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string as a json string
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
+    from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)
+
   /**
    * Returns length of array or map.
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 1391c9d57ff7c7bb9e2ae84dc08209a1a7cb0d2f..518d6e92b2ff7bb55a943e8edabef1a165369468 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.functions.from_json
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
 
 class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -94,4 +96,31 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
 
     checkAnswer(expr, expected)
   }
+
+  test("json_parser") {
+    val df = Seq("""{"a": 1}""").toDS()
+    val schema = new StructType().add("a", IntegerType)
+
+    checkAnswer(
+      df.select(from_json($"value", schema)),
+      Row(Row(1)) :: Nil)
+  }
+
+  test("json_parser missing columns") {
+    val df = Seq("""{"a": 1}""").toDS()
+    val schema = new StructType().add("b", IntegerType)
+
+    checkAnswer(
+      df.select(from_json($"value", schema)),
+      Row(Row(null)) :: Nil)
+  }
+
+  test("json_parser invalid json") {
+    val df = Seq("""{"a" 1}""").toDS()
+    val schema = new StructType().add("a", IntegerType)
+
+    checkAnswer(
+      df.select(from_json($"value", schema)),
+      Row(null) :: Nil)
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
index c31dffedbdf673ac62ab16783547827c0a058505..0b72da5f3759c0243f012b5380daae6b640fd478 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.json
 
 import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.json.JSONOptions
 import org.apache.spark.sql.test.SharedSQLContext
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 3d533c14e18e7d179459654fba5454fdeafa7e83..456052f79afccf1fb7fa027389f2ebf9c094d5f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -26,9 +26,10 @@ import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
-import org.apache.spark.SparkException
 import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkException
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType