diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 8209cec4ba0a8498cd52216e1f67ecacd28fc239..4f375e59c34d469264e146217af180883015fa54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -410,10 +410,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         parsedOptions.columnNameOfCorruptRecord)
       iter.flatMap(parser.parse)
     }
-
-    Dataset.ofRows(
-      sparkSession,
-      LogicalRDD(schema.toAttributes, parsed, isStreaming = jsonDataset.isStreaming)(sparkSession))
+    sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
   }
 
   /**
@@ -473,10 +470,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         parsedOptions.columnNameOfCorruptRecord)
       iter.flatMap(parser.parse)
     }
-
-    Dataset.ofRows(
-      sparkSession,
-      LogicalRDD(schema.toAttributes, parsed, isStreaming = csvDataset.isStreaming)(sparkSession))
+    sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 863c316bbac654a6a5226d0b9e9cea501497bf70..d5ab53ad8fe297282991aececad94a1331e3c02a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -559,8 +559,7 @@ class SparkSession private(
   }
 
   /**
-   * Creates a `DataFrame` from an RDD[Row].
-   * User can specify whether the input rows should be converted to Catalyst rows.
+   * Creates a `DataFrame` from an `RDD[InternalRow]`.
    */
   private[sql] def internalCreateDataFrame(
       catalystRows: RDD[InternalRow],
@@ -576,7 +575,7 @@ class SparkSession private(
   }
 
   /**
-   * Creates a `DataFrame` from an RDD[Row].
+   * Creates a `DataFrame` from an `RDD[Row]`.
    * User can specify whether the input rows should be converted to Catalyst rows.
    */
   private[sql] def createDataFrame(
@@ -589,10 +588,9 @@ class SparkSession private(
       val encoder = RowEncoder(schema)
       rowRDD.map(encoder.toRow)
     } else {
-      rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
+      rowRDD.map { r: Row => InternalRow.fromSeq(r.toSeq) }
     }
-    val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
-    Dataset.ofRows(self, logicalPlan)
+    internalCreateDataFrame(catalystRows, schema)
   }
 
 
@@ -737,13 +735,15 @@ class SparkSession private(
   }
 
   /**
-   * Apply a schema defined by the schema to an RDD. It is only used by PySpark.
+   * Apply `schema` to an RDD.
+   *
+   * @note Used by PySpark only
    */
   private[sql] def applySchemaToPythonRDD(
       rdd: RDD[Array[Any]],
       schema: StructType): DataFrame = {
     val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow])
-    Dataset.ofRows(self, LogicalRDD(schema.toAttributes, rowRdd)(self))
+    internalCreateDataFrame(rowRdd, schema)
   }
 
   /**