diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 36c0f18b6e2e358b7a33776e920c2fa442c413a2..7eea6d8d85b6ffd2e81e8ca9ab1b7b085a308a3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3060,8 +3060,9 @@ object functions {
     from_json(e, schema, Map.empty[String, String])
 
   /**
-   * Parses a column containing a JSON string into a `StructType` or `ArrayType` of `StructType`s
-   * with the specified schema. Returns `null`, in the case of an unparseable string.
+   * (Java-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
+   * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
+   * string.
    *
    * @param e a string column containing JSON data.
    * @param schema the schema to use when parsing the json string as a json string. In Spark 2.1,
@@ -3072,6 +3073,23 @@ object functions {
    * @since 2.1.0
    */
   def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = {
+    from_json(e, schema, options.asScala.toMap)
+  }
+
+  /**
+   * (Scala-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
+   * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
+   * string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string as a json string. In Spark 2.1,
+   *               the user-provided schema has to be in JSON format. Since Spark 2.2, the DDL
+   *               format is also supported for the schema.
+   *
+   * @group collection_funcs
+   * @since 2.3.0
+   */
+  def from_json(e: Column, schema: String, options: Map[String, String]): Column = {
     val dataType = try {
       DataType.fromJson(schema)
     } catch {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 69a500c845a7b316be679131c5dd1b58e546c341..cf2d00fc94423e1bcded6c8aa6630c4692b7962c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -156,13 +156,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
       Row(Seq(Row(1, "a"), Row(2, null), Row(null, null))))
   }
 
-  test("from_json uses DDL strings for defining a schema") {
+  test("from_json uses DDL strings for defining a schema - java") {
     val df = Seq("""{"a": 1, "b": "haa"}""").toDS()
     checkAnswer(
       df.select(from_json($"value", "a INT, b STRING", new java.util.HashMap[String, String]())),
       Row(Row(1, "haa")) :: Nil)
   }
 
+  test("from_json uses DDL strings for defining a schema - scala") {
+    val df = Seq("""{"a": 1, "b": "haa"}""").toDS()
+    checkAnswer(
+      df.select(from_json($"value", "a INT, b STRING", Map[String, String]())),
+      Row(Row(1, "haa")) :: Nil)
+  }
+
   test("to_json - struct") {
     val df = Seq(Tuple1(Tuple1(1))).toDF("a")