diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 584bbbf0e4c2dd7fc8780b041d75f1293a92fc68..e7e9e353f9e82179e1f1a6a79255b8b2fdfb0e17 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -364,9 +364,10 @@ parquetFile <- function(x, ...) { #' Create a SparkDataFrame from a text file. #' -#' Loads a text file and returns a SparkDataFrame with a single string column named "value". -#' If the directory structure of the text files contains partitioning information, those are -#' ignored in the resulting DataFrame. +#' Loads text files and returns a SparkDataFrame whose schema starts with +#' a string column named "value", and followed by partitioned columns if +#' there are any. +#' #' Each line in the text file is a new row in the resulting SparkDataFrame. #' #' @param path Path of file to read. A vector of multiple paths is allowed. diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java index ded442096c257a91242eb518ef864e661e9c3d6f..362bd4435ecb3ea400d18bf73030c1d64f9895fc 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java @@ -126,7 +126,7 @@ public final class JavaHdfsLR { .appName("JavaHdfsLR") .getOrCreate(); - JavaRDD<String> lines = spark.read().text(args[0]).javaRDD(); + JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache(); int ITERATIONS = Integer.parseInt(args[1]); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index 128b5ab17c8d44ccbb70dc14691bb8154af4a872..ed0bb876579ad3ed11aa9cbf52147218f50a2007 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -82,7 +82,7 @@ public final class JavaPageRank { // URL neighbor URL // URL neighbor URL // ... - JavaRDD<String> lines = spark.read().text(args[0]).javaRDD(); + JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); // Loads all URLs from input file and initialize their neighbors. JavaPairRDD<String, Iterable<String>> links = lines.mapToPair( diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java index 1caee60e348d21bfe2c14a0f053ce1ffc53f38c0..8f18604c0750cbe9a0498033d7b2a86b7f7c9bfd 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java @@ -46,7 +46,7 @@ public final class JavaWordCount { .appName("JavaWordCount") .getOrCreate(); - JavaRDD<String> lines = spark.read().text(args[0]).javaRDD(); + JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java index 7f568f4e0db4e73b184b3f23a4fe00f1420cde35..739558e81ffb04b4cf10222e76bf59152cc56cad 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java @@ -87,7 +87,7 @@ public class JavaALSExample { // $example on$ JavaRDD<Rating> ratingsRDD = spark - .read().text("data/mllib/als/sample_movielens_ratings.txt").javaRDD() + .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD() .map(new Function<String, Rating>() { public Rating call(String str) { return Rating.parseRating(str); diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java index 55e591d0ce1662058dd9149fc7a0add0d42a2671..e512979ac71b08037ac8e07362d5583ebfb5dec3 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java @@ -59,7 +59,7 @@ public class JavaSparkSQL { System.out.println("=== Data source: RDD ==="); // Load a text file and convert each line to a Java Bean. String file = "examples/src/main/resources/people.txt"; - JavaRDD<Person> people = spark.read().text(file).javaRDD().map( + JavaRDD<Person> people = spark.read().textFile(file).javaRDD().map( new Function<String, Person>() { @Override public Person call(String line) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 84f133e0116a2fd40b33ab284118a6fd5baba339..05ac6cbcb35bccd1d20f84332b63769c5614e6db 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -72,7 +72,7 @@ object SparkHdfsLR { .getOrCreate() val inputPath = args(0) - val lines = spark.read.text(inputPath).rdd + val lines = spark.read.textFile(inputPath).rdd val points = lines.map(parsePoint).cache() val ITERATIONS = args(1).toInt diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index aa93c93c441f40da9d9cd3f6c4a828f5f4023ad7..fec3160e9f37bb79383b00ec07c573588bcf0200 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -71,7 +71,7 @@ object SparkKMeans { .appName("SparkKMeans") .getOrCreate() - val lines = spark.read.text(args(0)).rdd + val lines = spark.read.textFile(args(0)).rdd val data = lines.map(parseVector _).cache() val K = args(1).toInt val convergeDist = args(2).toDouble diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala index b7c363c7d4faea27de8dd5b0a4eb4c9cfc6a5215..d0b874c48d00a854bbeba4b2b999692823780e0d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala @@ -56,7 +56,7 @@ object SparkPageRank { .getOrCreate() val iters = if (args.length > 1) args(1).toInt else 10 - val lines = spark.read.text(args(0)).rdd + val lines = spark.read.textFile(args(0)).rdd val links = lines.map{ s => val parts = s.split("\\s+") (parts(0), parts(1)) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala index da19ea9f10ec641a3ad9c6a418ca29259d6d4225..bb5d163608494d95e497a9886f33ea5dbb70dc7e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala @@ -50,7 +50,7 @@ object ALSExample { import spark.implicits._ // $example on$ - val ratings = spark.read.text("data/mllib/als/sample_movielens_ratings.txt") + val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt") .map(parseRating) .toDF() val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2)) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala index 781a934df66372ab2640e352807bbd243cf1977c..d514891da78fcd5627eee0dc1e3d287c122b8a6f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala @@ -33,7 +33,7 @@ object RankingMetricsExample { import spark.implicits._ // $example on$ // Read in the ratings data - val ratings = spark.read.text("data/mllib/sample_movielens_data.txt").rdd.map { line => + val ratings = spark.read.textFile("data/mllib/sample_movielens_data.txt").rdd.map { line => val fields = line.split("::") Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5) }.cache() diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index f3182b237ec6552ffe7467ab32faa15c4aac907a..0f50f672a22d1492362acdfd4baad55845d9223b 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -291,10 +291,10 @@ class DataFrameReader(object): @ignore_unicode_prefix @since(1.6) def text(self, paths): - """Loads a text file and returns a :class:`DataFrame` with a single string column named "value". - If the directory structure of the text files contains partitioning information, - those are ignored in the resulting DataFrame. To include partitioning information as - columns, use ``read.format('text').load(...)``. + """ + Loads text files and returns a :class:`DataFrame` whose schema starts with a + string column named "value", and followed by partitioned columns if there + are any. Each line in the text file is a new row in the resulting DataFrame. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 078b63ee87683fdc49dcbcb52bc4b057e46bcf2d..dfe31da3f310713f55f2700533f23e2897faad72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -449,30 +449,47 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName))) } + /** + * Loads text files and returns a [[DataFrame]] whose schema starts with a string column named + * "value", and followed by partitioned columns if there are any. + * + * Each line in the text files is a new row in the resulting DataFrame. For example: + * {{{ + * // Scala: + * spark.read.text("/path/to/spark/README.md") + * + * // Java: + * spark.read().text("/path/to/spark/README.md") + * }}} + * + * @param paths input path + * @since 1.6.0 + */ + @scala.annotation.varargs + def text(paths: String*): DataFrame = format("text").load(paths : _*) + /** * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset * contains a single string column named "value". * * If the directory structure of the text files contains partitioning information, those are - * ignored in the resulting Dataset. To include partitioning information as columns, use - * `read.format("text").load("...")`. + * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. * * Each line in the text files is a new element in the resulting Dataset. For example: * {{{ * // Scala: - * spark.read.text("/path/to/spark/README.md") + * spark.read.textFile("/path/to/spark/README.md") * * // Java: - * spark.read().text("/path/to/spark/README.md") + * spark.read().textFile("/path/to/spark/README.md") * }}} * * @param paths input path * @since 2.0.0 */ @scala.annotation.varargs - def text(paths: String*): Dataset[String] = { - format("text").load(paths : _*).select("value") - .as[String](sparkSession.implicits.newStringEncoder) + def textFile(paths: String*): Dataset[String] = { + text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder) } /////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 0152f3f85a23018c29723b47f3e8f60a90053923..318b53cdbbaa04c6cadc250cdf37f3a8f9d7e848 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -338,10 +338,10 @@ public class JavaDataFrameSuite { @Test public void testTextLoad() { - Dataset<String> ds1 = spark.read().text(getResource("text-suite.txt")); + Dataset<String> ds1 = spark.read().textFile(getResource("text-suite.txt")); Assert.assertEquals(4L, ds1.count()); - Dataset<String> ds2 = spark.read().text( + Dataset<String> ds2 = spark.read().textFile( getResource("text-suite.txt"), getResource("text-suite2.txt")); Assert.assertEquals(5L, ds2.count()); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 5695f6af7bd44ffe60d940cfab090ef99c40f733..4ed517cb26ae314ee32488a0ae8ee22106c33682 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -36,7 +36,7 @@ class TextSuite extends QueryTest with SharedSQLContext { } test("SQLContext.read.text() API") { - verifyFrame(spark.read.text(testFile).toDF()) + verifyFrame(spark.read.text(testFile)) } test("SPARK-12562 verify write.text() can handle column name beyond `value`") { @@ -45,7 +45,7 @@ class TextSuite extends QueryTest with SharedSQLContext { val tempFile = Utils.createTempDir() tempFile.delete() df.write.text(tempFile.getCanonicalPath) - verifyFrame(spark.read.text(tempFile.getCanonicalPath).toDF()) + verifyFrame(spark.read.text(tempFile.getCanonicalPath)) Utils.deleteRecursively(tempFile) } @@ -64,20 +64,20 @@ class TextSuite extends QueryTest with SharedSQLContext { } } - test("reading partitioned data using read.text()") { + test("reading partitioned data using read.textFile()") { val partitionedData = Thread.currentThread().getContextClassLoader .getResource("text-partitioned").toString - val df = spark.read.text(partitionedData) - val data = df.collect() + val ds = spark.read.textFile(partitionedData) + val data = ds.collect() - assert(df.schema == new StructType().add("value", StringType)) + assert(ds.schema == new StructType().add("value", StringType)) assert(data.length == 2) } - test("support for partitioned reading") { + test("support for partitioned reading using read.text()") { val partitionedData = Thread.currentThread().getContextClassLoader .getResource("text-partitioned").toString - val df = spark.read.format("text").load(partitionedData) + val df = spark.read.text(partitionedData) val data = df.filter("year = '2015'").select("value").collect() assert(data(0) == Row("2015-test")) @@ -94,7 +94,7 @@ class TextSuite extends QueryTest with SharedSQLContext { testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath) val compressedFiles = new File(tempDirPath).listFiles() assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension"))) - verifyFrame(spark.read.text(tempDirPath).toDF()) + verifyFrame(spark.read.text(tempDirPath)) } val errMsg = intercept[IllegalArgumentException] { @@ -121,7 +121,7 @@ class TextSuite extends QueryTest with SharedSQLContext { .options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath) val compressedFiles = new File(tempDirPath).listFiles() assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz"))) - verifyFrame(spark.read.options(extraOptions).text(tempDirPath).toDF()) + verifyFrame(spark.read.options(extraOptions).text(tempDirPath)) } }