From e2ab79d5ea00af45c083cc9a6607d2f0905f9908 Mon Sep 17 00:00:00 2001
From: Wenchen Fan <wenchen@databricks.com>
Date: Sun, 12 Jun 2016 21:36:41 -0700
Subject: [PATCH] [SPARK-15898][SQL] DataFrameReader.text should return
 DataFrame

## What changes were proposed in this pull request?

We want to maintain API compatibility for DataFrameReader.text, and will introduce a new API called DataFrameReader.textFile which returns Dataset[String].

affected PRs:
https://github.com/apache/spark/pull/11731
https://github.com/apache/spark/pull/13104
https://github.com/apache/spark/pull/13184

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13604 from cloud-fan/revert.
---
 R/pkg/R/SQLContext.R                          |  7 +++--
 .../org/apache/spark/examples/JavaHdfsLR.java |  2 +-
 .../apache/spark/examples/JavaPageRank.java   |  2 +-
 .../apache/spark/examples/JavaWordCount.java  |  2 +-
 .../spark/examples/ml/JavaALSExample.java     |  2 +-
 .../spark/examples/sql/JavaSparkSQL.java      |  2 +-
 .../apache/spark/examples/SparkHdfsLR.scala   |  2 +-
 .../apache/spark/examples/SparkKMeans.scala   |  2 +-
 .../apache/spark/examples/SparkPageRank.scala |  2 +-
 .../apache/spark/examples/ml/ALSExample.scala |  2 +-
 .../mllib/RankingMetricsExample.scala         |  2 +-
 python/pyspark/sql/readwriter.py              |  8 ++---
 .../apache/spark/sql/DataFrameReader.scala    | 31 ++++++++++++++-----
 .../apache/spark/sql/JavaDataFrameSuite.java  |  4 +--
 .../datasources/text/TextSuite.scala          | 20 ++++++------
 15 files changed, 54 insertions(+), 36 deletions(-)

diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 584bbbf0e4..e7e9e353f9 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -364,9 +364,10 @@ parquetFile <- function(x, ...) {
 
 #' Create a SparkDataFrame from a text file.
 #'
-#' Loads a text file and returns a SparkDataFrame with a single string column named "value".
-#' If the directory structure of the text files contains partitioning information, those are
-#' ignored in the resulting DataFrame.
+#' Loads text files and returns a SparkDataFrame whose schema starts with
+#' a string column named "value", and followed by partitioned columns if
+#' there are any.
+#'
 #' Each line in the text file is a new row in the resulting SparkDataFrame.
 #'
 #' @param path Path of file to read. A vector of multiple paths is allowed.
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index ded442096c..362bd4435e 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -126,7 +126,7 @@ public final class JavaHdfsLR {
       .appName("JavaHdfsLR")
       .getOrCreate();
 
-    JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
+    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
     JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
     int ITERATIONS = Integer.parseInt(args[1]);
 
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index 128b5ab17c..ed0bb87657 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -82,7 +82,7 @@ public final class JavaPageRank {
     //     URL         neighbor URL
     //     URL         neighbor URL
     //     ...
-    JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
+    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
 
     // Loads all URLs from input file and initialize their neighbors.
     JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 1caee60e34..8f18604c07 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -46,7 +46,7 @@ public final class JavaWordCount {
       .appName("JavaWordCount")
       .getOrCreate();
 
-    JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
+    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
 
     JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
       @Override
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
index 7f568f4e0d..739558e81f 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
@@ -87,7 +87,7 @@ public class JavaALSExample {
 
     // $example on$
     JavaRDD<Rating> ratingsRDD = spark
-      .read().text("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
+      .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
       .map(new Function<String, Rating>() {
         public Rating call(String str) {
           return Rating.parseRating(str);
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
index 55e591d0ce..e512979ac7 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
@@ -59,7 +59,7 @@ public class JavaSparkSQL {
     System.out.println("=== Data source: RDD ===");
     // Load a text file and convert each line to a Java Bean.
     String file = "examples/src/main/resources/people.txt";
-    JavaRDD<Person> people = spark.read().text(file).javaRDD().map(
+    JavaRDD<Person> people = spark.read().textFile(file).javaRDD().map(
       new Function<String, Person>() {
         @Override
         public Person call(String line) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 84f133e011..05ac6cbcb3 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -72,7 +72,7 @@ object SparkHdfsLR {
       .getOrCreate()
 
     val inputPath = args(0)
-    val lines = spark.read.text(inputPath).rdd
+    val lines = spark.read.textFile(inputPath).rdd
 
     val points = lines.map(parsePoint).cache()
     val ITERATIONS = args(1).toInt
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
index aa93c93c44..fec3160e9f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
@@ -71,7 +71,7 @@ object SparkKMeans {
       .appName("SparkKMeans")
       .getOrCreate()
 
-    val lines = spark.read.text(args(0)).rdd
+    val lines = spark.read.textFile(args(0)).rdd
     val data = lines.map(parseVector _).cache()
     val K = args(1).toInt
     val convergeDist = args(2).toDouble
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
index b7c363c7d4..d0b874c48d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
@@ -56,7 +56,7 @@ object SparkPageRank {
       .getOrCreate()
 
     val iters = if (args.length > 1) args(1).toInt else 10
-    val lines = spark.read.text(args(0)).rdd
+    val lines = spark.read.textFile(args(0)).rdd
     val links = lines.map{ s =>
       val parts = s.split("\\s+")
       (parts(0), parts(1))
diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
index da19ea9f10..bb5d163608 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
@@ -50,7 +50,7 @@ object ALSExample {
     import spark.implicits._
 
     // $example on$
-    val ratings = spark.read.text("data/mllib/als/sample_movielens_ratings.txt")
+    val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
       .map(parseRating)
       .toDF()
     val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala
index 781a934df6..d514891da7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala
@@ -33,7 +33,7 @@ object RankingMetricsExample {
     import spark.implicits._
     // $example on$
     // Read in the ratings data
-    val ratings = spark.read.text("data/mllib/sample_movielens_data.txt").rdd.map { line =>
+    val ratings = spark.read.textFile("data/mllib/sample_movielens_data.txt").rdd.map { line =>
       val fields = line.split("::")
       Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5)
     }.cache()
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index f3182b237e..0f50f672a2 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -291,10 +291,10 @@ class DataFrameReader(object):
     @ignore_unicode_prefix
     @since(1.6)
     def text(self, paths):
-        """Loads a text file and returns a :class:`DataFrame` with a single string column named "value".
-        If the directory structure of the text files contains partitioning information,
-        those are ignored in the resulting DataFrame. To include partitioning information as
-        columns, use ``read.format('text').load(...)``.
+        """
+        Loads text files and returns a :class:`DataFrame` whose schema starts with a
+        string column named "value", and followed by partitioned columns if there
+        are any.
 
         Each line in the text file is a new row in the resulting DataFrame.
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 078b63ee87..dfe31da3f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -449,30 +449,47 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
         sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)))
   }
 
+  /**
+   * Loads text files and returns a [[DataFrame]] whose schema starts with a string column named
+   * "value", and followed by partitioned columns if there are any.
+   *
+   * Each line in the text files is a new row in the resulting DataFrame. For example:
+   * {{{
+   *   // Scala:
+   *   spark.read.text("/path/to/spark/README.md")
+   *
+   *   // Java:
+   *   spark.read().text("/path/to/spark/README.md")
+   * }}}
+   *
+   * @param paths input path
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def text(paths: String*): DataFrame = format("text").load(paths : _*)
+
   /**
    * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset
    * contains a single string column named "value".
    *
    * If the directory structure of the text files contains partitioning information, those are
-   * ignored in the resulting Dataset. To include partitioning information as columns, use
-   * `read.format("text").load("...")`.
+   * ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
    *
    * Each line in the text files is a new element in the resulting Dataset. For example:
    * {{{
    *   // Scala:
-   *   spark.read.text("/path/to/spark/README.md")
+   *   spark.read.textFile("/path/to/spark/README.md")
    *
    *   // Java:
-   *   spark.read().text("/path/to/spark/README.md")
+   *   spark.read().textFile("/path/to/spark/README.md")
    * }}}
    *
    * @param paths input path
    * @since 2.0.0
    */
   @scala.annotation.varargs
-  def text(paths: String*): Dataset[String] = {
-    format("text").load(paths : _*).select("value")
-      .as[String](sparkSession.implicits.newStringEncoder)
+  def textFile(paths: String*): Dataset[String] = {
+    text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder)
   }
 
   ///////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 0152f3f85a..318b53cdbb 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -338,10 +338,10 @@ public class JavaDataFrameSuite {
 
   @Test
   public void testTextLoad() {
-    Dataset<String> ds1 = spark.read().text(getResource("text-suite.txt"));
+    Dataset<String> ds1 = spark.read().textFile(getResource("text-suite.txt"));
     Assert.assertEquals(4L, ds1.count());
 
-    Dataset<String> ds2 = spark.read().text(
+    Dataset<String> ds2 = spark.read().textFile(
       getResource("text-suite.txt"),
       getResource("text-suite2.txt"));
     Assert.assertEquals(5L, ds2.count());
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index 5695f6af7b..4ed517cb26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -36,7 +36,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SQLContext.read.text() API") {
-    verifyFrame(spark.read.text(testFile).toDF())
+    verifyFrame(spark.read.text(testFile))
   }
 
   test("SPARK-12562 verify write.text() can handle column name beyond `value`") {
@@ -45,7 +45,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
     val tempFile = Utils.createTempDir()
     tempFile.delete()
     df.write.text(tempFile.getCanonicalPath)
-    verifyFrame(spark.read.text(tempFile.getCanonicalPath).toDF())
+    verifyFrame(spark.read.text(tempFile.getCanonicalPath))
 
     Utils.deleteRecursively(tempFile)
   }
@@ -64,20 +64,20 @@ class TextSuite extends QueryTest with SharedSQLContext {
     }
   }
 
-  test("reading partitioned data using read.text()") {
+  test("reading partitioned data using read.textFile()") {
     val partitionedData = Thread.currentThread().getContextClassLoader
       .getResource("text-partitioned").toString
-    val df = spark.read.text(partitionedData)
-    val data = df.collect()
+    val ds = spark.read.textFile(partitionedData)
+    val data = ds.collect()
 
-    assert(df.schema == new StructType().add("value", StringType))
+    assert(ds.schema == new StructType().add("value", StringType))
     assert(data.length == 2)
   }
 
-  test("support for partitioned reading") {
+  test("support for partitioned reading using read.text()") {
     val partitionedData = Thread.currentThread().getContextClassLoader
       .getResource("text-partitioned").toString
-    val df = spark.read.format("text").load(partitionedData)
+    val df = spark.read.text(partitionedData)
     val data = df.filter("year = '2015'").select("value").collect()
 
     assert(data(0) == Row("2015-test"))
@@ -94,7 +94,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
         testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath)
         val compressedFiles = new File(tempDirPath).listFiles()
         assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension")))
-        verifyFrame(spark.read.text(tempDirPath).toDF())
+        verifyFrame(spark.read.text(tempDirPath))
     }
 
     val errMsg = intercept[IllegalArgumentException] {
@@ -121,7 +121,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
         .options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath)
       val compressedFiles = new File(tempDirPath).listFiles()
       assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
-      verifyFrame(spark.read.options(extraOptions).text(tempDirPath).toDF())
+      verifyFrame(spark.read.options(extraOptions).text(tempDirPath))
     }
   }
 
-- 
GitLab