diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 57c978bec8485acd366405005da0d7e32b627fe6..ef85f1db895cd033d4875685b17847f97431d041 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -399,8 +399,10 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
   }
 
   /**
-   * Loads a text file and returns a [[DataFrame]] with a single string column named "value".
-   * Each line in the text file is a new row in the resulting DataFrame. For example:
+   * Loads a text file and returns a [[Dataset]] of String. The underlying schema of the Dataset
+   * contains a single string column named "value".
+   *
+   * Each line in the text file is a new row in the resulting Dataset. For example:
    * {{{
    *   // Scala:
    *   sqlContext.read.text("/path/to/spark/README.md")
@@ -410,10 +412,12 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * }}}
    *
    * @param paths input path
-   * @since 1.6.0
+   * @since 2.0.0
    */
   @scala.annotation.varargs
-  def text(paths: String*): DataFrame = format("text").load(paths : _*)
+  def text(paths: String*): Dataset[String] = {
+    format("text").load(paths : _*).as[String](sqlContext.implicits.newStringEncoder)
+  }
 
   ///////////////////////////////////////////////////////////////////////////////////////
   // Builder pattern config options
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 42554720edae5195143b4c55271499e6cf149bd7..7fe17e0cf735c493aca7ed124b80b4240c61f15c 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -316,14 +316,14 @@ public class JavaDataFrameSuite {
 
   @Test
   public void testTextLoad() {
-    Dataset<Row> df1 = context.read().text(
+    Dataset<String> ds1 = context.read().text(
       Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString());
-    Assert.assertEquals(4L, df1.count());
+    Assert.assertEquals(4L, ds1.count());
 
-    Dataset<Row> df2 = context.read().text(
+    Dataset<String> ds2 = context.read().text(
       Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString(),
       Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString());
-    Assert.assertEquals(5L, df2.count());
+    Assert.assertEquals(5L, ds2.count());
   }
 
   @Test
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index ee398721c044157cbc11bd6055b45732b029a146..47330f1db369e6f5ed4ef266cd0286b52b600443 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -37,7 +37,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SQLContext.read.text() API") {
-    verifyFrame(sqlContext.read.text(testFile))
+    verifyFrame(sqlContext.read.text(testFile).toDF())
   }
 
   test("SPARK-12562 verify write.text() can handle column name beyond `value`") {
@@ -46,7 +46,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
     val tempFile = Utils.createTempDir()
     tempFile.delete()
     df.write.text(tempFile.getCanonicalPath)
-    verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath))
+    verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath).toDF())
 
     Utils.deleteRecursively(tempFile)
   }
@@ -75,7 +75,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
         testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath)
         val compressedFiles = new File(tempDirPath).listFiles()
         assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension")))
-        verifyFrame(sqlContext.read.text(tempDirPath))
+        verifyFrame(sqlContext.read.text(tempDirPath).toDF())
     }
 
     val errMsg = intercept[IllegalArgumentException] {
@@ -103,7 +103,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
         testDf.write.option("compression", "none").mode(SaveMode.Overwrite).text(tempDirPath)
         val compressedFiles = new File(tempDirPath).listFiles()
         assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
-        verifyFrame(sqlContext.read.text(tempDirPath))
+        verifyFrame(sqlContext.read.text(tempDirPath).toDF())
       } finally {
         // Hadoop 1 doesn't have `Configuration.unset`
         hadoopConfiguration.clear()