Skip to content
Snippets Groups Projects
Commit bb1aaf28 authored by Prashant Sharma's avatar Prashant Sharma Committed by Michael Armbrust
Browse files

[SPARK-16411][SQL][STREAMING] Add textFile to Structured Streaming.

## What changes were proposed in this pull request?

Adds the textFile API which exists in DataFrameReader and serves same purpose.

## How was this patch tested?

Added corresponding testcase.

Author: Prashant Sharma <prashsh1@in.ibm.com>

Closes #14087 from ScrapCodes/textFile.
parent aa3a6841
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types.StructType
......@@ -283,6 +283,37 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*/
def text(path: String): DataFrame = format("text").load(path)
/**
* Loads text file(s) and returns a [[Dataset]] of String. The underlying schema of the Dataset
* contains a single string column named "value".
*
* If the directory structure of the text files contains partitioning information, those are
* ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
*
* Each line in the text file is a new element in the resulting Dataset. For example:
* {{{
* // Scala:
* spark.readStream.textFile("/path/to/spark/README.md")
*
* // Java:
* spark.readStream().textFile("/path/to/spark/README.md")
* }}}
*
* You can set the following text-specific options to deal with text files:
* <ul>
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
* considered in every trigger.</li>
* </ul>
*
* @param path input path
* @since 2.1.0
*/
def textFile(path: String): Dataset[String] = {
if (userSpecifiedSchema.nonEmpty) {
throw new AnalysisException("User specified schema not supported with `textFile`")
}
text(path).select("value").as[String](sparkSession.implicits.newStringEncoder)
}
///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
......
......@@ -342,6 +342,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
test("read from textfile") {
withTempDirs { case (src, tmp) =>
val textStream = spark.readStream.textFile(src.getCanonicalPath)
val filtered = textStream.filter(_.contains("keep"))
testStream(filtered)(
AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
)
}
}
test("SPARK-17165 should not track the list of seen files indefinitely") {
// This test works by:
// 1. Create a file
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment