From 643649dcbfabc5d6952c2ecfb98286324c887665 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Tue, 15 Mar 2016 14:57:54 -0700
Subject: [PATCH] [SPARK-13895][SQL] DataFrameReader.text should return
 Dataset[String]

## What changes were proposed in this pull request?
This patch changes DataFrameReader.text()'s return type from DataFrame to Dataset[String].

Closes #11731.

## How was this patch tested?
Updated existing integration tests to reflect the change.

Author: Reynold Xin <rxin@databricks.com>

Closes #11739 from rxin/SPARK-13895.
---
 .../scala/org/apache/spark/sql/DataFrameReader.scala | 12 ++++++++----
 .../org/apache/spark/sql/JavaDataFrameSuite.java     |  8 ++++----
 .../sql/execution/datasources/text/TextSuite.scala   |  8 ++++----
 3 files changed, 16 insertions(+), 12 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 57c978bec8..ef85f1db89 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -399,8 +399,10 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
   }
 
   /**
-   * Loads a text file and returns a [[DataFrame]] with a single string column named "value".
-   * Each line in the text file is a new row in the resulting DataFrame. For example:
+   * Loads a text file and returns a [[Dataset]] of String. The underlying schema of the Dataset
+   * contains a single string column named "value".
+   *
+   * Each line in the text file is a new row in the resulting Dataset. For example:
    * {{{
    *   // Scala:
    *   sqlContext.read.text("/path/to/spark/README.md")
@@ -410,10 +412,12 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * }}}
    *
    * @param paths input path
-   * @since 1.6.0
+   * @since 2.0.0
    */
   @scala.annotation.varargs
-  def text(paths: String*): DataFrame = format("text").load(paths : _*)
+  def text(paths: String*): Dataset[String] = {
+    format("text").load(paths : _*).as[String](sqlContext.implicits.newStringEncoder)
+  }
 
   ///////////////////////////////////////////////////////////////////////////////////////
   // Builder pattern config options
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 42554720ed..7fe17e0cf7 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -316,14 +316,14 @@ public class JavaDataFrameSuite {
 
   @Test
   public void testTextLoad() {
-    Dataset<Row> df1 = context.read().text(
+    Dataset<String> ds1 = context.read().text(
       Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString());
-    Assert.assertEquals(4L, df1.count());
+    Assert.assertEquals(4L, ds1.count());
 
-    Dataset<Row> df2 = context.read().text(
+    Dataset<String> ds2 = context.read().text(
       Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString(),
       Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString());
-    Assert.assertEquals(5L, df2.count());
+    Assert.assertEquals(5L, ds2.count());
   }
 
   @Test
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index ee398721c0..47330f1db3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -37,7 +37,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SQLContext.read.text() API") {
-    verifyFrame(sqlContext.read.text(testFile))
+    verifyFrame(sqlContext.read.text(testFile).toDF())
   }
 
   test("SPARK-12562 verify write.text() can handle column name beyond `value`") {
@@ -46,7 +46,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
     val tempFile = Utils.createTempDir()
     tempFile.delete()
     df.write.text(tempFile.getCanonicalPath)
-    verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath))
+    verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath).toDF())
 
     Utils.deleteRecursively(tempFile)
   }
@@ -75,7 +75,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
         testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath)
         val compressedFiles = new File(tempDirPath).listFiles()
         assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension")))
-        verifyFrame(sqlContext.read.text(tempDirPath))
+        verifyFrame(sqlContext.read.text(tempDirPath).toDF())
     }
 
     val errMsg = intercept[IllegalArgumentException] {
@@ -103,7 +103,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
         testDf.write.option("compression", "none").mode(SaveMode.Overwrite).text(tempDirPath)
         val compressedFiles = new File(tempDirPath).listFiles()
         assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
-        verifyFrame(sqlContext.read.text(tempDirPath))
+        verifyFrame(sqlContext.read.text(tempDirPath).toDF())
       } finally {
         // Hadoop 1 doesn't have `Configuration.unset`
         hadoopConfiguration.clear()
-- 
GitLab