Skip to content
Snippets Groups Projects
Commit 40da4d18 authored by Liwei Lin's avatar Liwei Lin Committed by Shixiong Zhu
Browse files

[SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSource

## What changes were proposed in this pull request?

Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this would cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`).

This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log).

## Usage

```scala
spark
  .readStream
  .option("fileNameOnly", true)
  .text("s3n://bucket/dir1/dir2")
  .writeStream
  ...
```
## How was this patch tested?

Added a test case

Author: Liwei Lin <lwlin7@gmail.com>

Closes #17120 from lw-lin/filename-only.
parent 3232e54f
No related branches found
No related tags found
No related merge requests found
......@@ -1052,10 +1052,18 @@ Here are the details of all the sinks in Spark.
<td>Append</td>
<td>
<code>path</code>: path to the output directory, must be specified.
<br/>
<code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max)
<br/>
<code>latestFirst</code>: whether to processs the latest new files first, useful when there is a large backlog of files(default: false)
<br/><br/>
<code>latestFirst</code>: whether to processs the latest new files first, useful when there is a large backlog of files (default: false)
<br/>
<code>fileNameOnly</code>: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
<br/>
· "file:///dataset.txt"<br/>
· "s3://a/dataset.txt"<br/>
· "s3n://a/b/dataset.txt"<br/>
· "s3a://a/b/c/dataset.txt"<br/>
<br/>
For file-format-specific options, see the related methods in DataFrameWriter
(<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>).
E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code>
......
......@@ -61,13 +61,29 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
* Whether to scan latest files first. If it's true, when the source finds unprocessed files in a
* trigger, it will first process the latest files.
*/
val latestFirst: Boolean = parameters.get("latestFirst").map { str =>
try {
str.toBoolean
} catch {
case _: IllegalArgumentException =>
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'")
}
}.getOrElse(false)
val latestFirst: Boolean = withBooleanParameter("latestFirst", false)
/**
* Whether to check new files based on only the filename instead of on the full path.
*
* With this set to `true`, the following files would be considered as the same file, because
* their filenames, "dataset.txt", are the same:
* - "file:///dataset.txt"
* - "s3://a/dataset.txt"
* - "s3n://a/b/dataset.txt"
* - "s3a://a/b/c/dataset.txt"
*/
val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
private def withBooleanParameter(name: String, default: Boolean) = {
parameters.get(name).map { str =>
try {
str.toBoolean
} catch {
case _: IllegalArgumentException =>
throw new IllegalArgumentException(
s"Invalid value '$str' for option '$name', must be 'true' or 'false'")
}
}.getOrElse(default)
}
}
......@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.streaming
import java.net.URI
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.{FileStatus, Path}
......@@ -79,9 +81,16 @@ class FileStreamSource(
sourceOptions.maxFileAgeMs
}
private val fileNameOnly = sourceOptions.fileNameOnly
if (fileNameOnly) {
logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " +
"UUID), otherwise, files with the same name but under different paths will be considered " +
"the same and causes data lost.")
}
/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(maxFileAgeMs)
val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
metadataLog.allFiles().foreach { entry =>
seenFiles.add(entry.path, entry.timestamp)
......@@ -268,7 +277,7 @@ object FileStreamSource {
* To prevent the hash map from growing indefinitely, a purge function is available to
* remove files "maxAgeMs" older than the latest file.
*/
class SeenFilesMap(maxAgeMs: Long) {
class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean) {
require(maxAgeMs >= 0)
/** Mapping from file to its timestamp. */
......@@ -280,9 +289,13 @@ object FileStreamSource {
/** Timestamp for the last purge operation. */
private var lastPurgeTimestamp: Timestamp = 0L
@inline private def stripPathIfNecessary(path: String) = {
if (fileNameOnly) new Path(new URI(path)).getName else path
}
/** Add a new file to the map. */
def add(path: String, timestamp: Timestamp): Unit = {
map.put(path, timestamp)
map.put(stripPathIfNecessary(path), timestamp)
if (timestamp > latestTimestamp) {
latestTimestamp = timestamp
}
......@@ -295,7 +308,7 @@ object FileStreamSource {
def isNewFile(path: String, timestamp: Timestamp): Boolean = {
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
timestamp >= lastPurgeTimestamp && !map.containsKey(path)
timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path))
}
/** Removes aged entries and returns the number of files removed. */
......@@ -314,9 +327,5 @@ object FileStreamSource {
}
def size: Int = map.size()
def allEntries: Seq[(String, Timestamp)] = {
map.asScala.toSeq
}
}
}
......@@ -1236,7 +1236,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
test("SeenFilesMap") {
val map = new SeenFilesMap(maxAgeMs = 10)
val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)
map.add("a", 5)
assert(map.size == 1)
......@@ -1269,8 +1269,26 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
assert(map.isNewFile("e", 20))
}
test("SeenFilesMap with fileNameOnly = true") {
val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true)
map.add("file:///a/b/c/d", 5)
map.add("file:///a/b/c/e", 5)
assert(map.size === 2)
assert(!map.isNewFile("d", 5))
assert(!map.isNewFile("file:///d", 5))
assert(!map.isNewFile("file:///x/d", 5))
assert(!map.isNewFile("file:///x/y/d", 5))
map.add("s3:///bucket/d", 5)
map.add("s3n:///bucket/d", 5)
map.add("s3a:///bucket/d", 5)
assert(map.size === 2)
}
test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
val map = new SeenFilesMap(maxAgeMs = 10)
val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)
map.add("a", 20)
assert(map.size == 1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment