diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index cb9493a57564398ee5e3c9ffcaead4efe9254d21..4c1341ed5da608b06c313866b76405483fb58473 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -323,6 +323,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * @param jsonRDD input RDD with one JSON object per record
    * @since 1.4.0
    */
+  @deprecated("Use json(Dataset[String]) instead.", "2.2.0")
   def json(jsonRDD: JavaRDD[String]): DataFrame = json(jsonRDD.rdd)
 
   /**
@@ -335,7 +336,22 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * @param jsonRDD input RDD with one JSON object per record
    * @since 1.4.0
    */
+  @deprecated("Use json(Dataset[String]) instead.", "2.2.0")
   def json(jsonRDD: RDD[String]): DataFrame = {
+    json(sparkSession.createDataset(jsonRDD)(Encoders.STRING))
+  }
+
+  /**
+   * Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines
+   * text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`.
+   *
+   * Unless the schema is specified using `schema` function, this function goes through the
+   * input once to determine the input schema.
+   *
+   * @param jsonDataset input Dataset with one JSON object per record
+   * @since 2.2.0
+   */
+  def json(jsonDataset: Dataset[String]): DataFrame = {
     val parsedOptions = new JSONOptions(
       extraOptions.toMap,
       sparkSession.sessionState.conf.sessionLocalTimeZone,
@@ -344,12 +360,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
 
     val schema = userSpecifiedSchema.getOrElse {
       JsonInferSchema.infer(
-        jsonRDD,
+        jsonDataset.rdd,
         parsedOptions,
         createParser)
     }
 
-    val parsed = jsonRDD.mapPartitions { iter =>
+    val parsed = jsonDataset.rdd.mapPartitions { iter =>
       val parser = new JacksonParser(schema, parsedOptions)
       iter.flatMap(parser.parse(_, createParser, UTF8String.fromString))
     }