From 40da4d181d648308de85fdcabc5c098ee861949a Mon Sep 17 00:00:00 2001 From: Liwei Lin <lwlin7@gmail.com> Date: Thu, 9 Mar 2017 11:02:44 -0800 Subject: [PATCH] [SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSource ## What changes were proposed in this pull request? Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this would cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`). This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log). ## Usage ```scala spark .readStream .option("fileNameOnly", true) .text("s3n://bucket/dir1/dir2") .writeStream ... ``` ## How was this patch tested? Added a test case Author: Liwei Lin <lwlin7@gmail.com> Closes #17120 from lw-lin/filename-only. --- .../structured-streaming-programming-guide.md | 12 +++++-- .../streaming/FileStreamOptions.scala | 34 ++++++++++++++----- .../streaming/FileStreamSource.scala | 25 +++++++++----- .../sql/streaming/FileStreamSourceSuite.scala | 22 ++++++++++-- 4 files changed, 72 insertions(+), 21 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 6af47b6efb..995ac77a4f 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1052,10 +1052,18 @@ Here are the details of all the sinks in Spark. <td>Append</td> <td> <code>path</code>: path to the output directory, must be specified. + <br/> <code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max) <br/> - <code>latestFirst</code>: whether to processs the latest new files first, useful when there is a large backlog of files(default: false) - <br/><br/> + <code>latestFirst</code>: whether to processs the latest new files first, useful when there is a large backlog of files (default: false) + <br/> + <code>fileNameOnly</code>: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same: + <br/> + · "file:///dataset.txt"<br/> + · "s3://a/dataset.txt"<br/> + · "s3n://a/b/dataset.txt"<br/> + · "s3a://a/b/c/dataset.txt"<br/> + <br/> For file-format-specific options, see the related methods in DataFrameWriter (<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>). E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index e7ba901945..d54ed44b43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -61,13 +61,29 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging * Whether to scan latest files first. If it's true, when the source finds unprocessed files in a * trigger, it will first process the latest files. */ - val latestFirst: Boolean = parameters.get("latestFirst").map { str => - try { - str.toBoolean - } catch { - case _: IllegalArgumentException => - throw new IllegalArgumentException( - s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'") - } - }.getOrElse(false) + val latestFirst: Boolean = withBooleanParameter("latestFirst", false) + + /** + * Whether to check new files based on only the filename instead of on the full path. + * + * With this set to `true`, the following files would be considered as the same file, because + * their filenames, "dataset.txt", are the same: + * - "file:///dataset.txt" + * - "s3://a/dataset.txt" + * - "s3n://a/b/dataset.txt" + * - "s3a://a/b/c/dataset.txt" + */ + val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + + private def withBooleanParameter(name: String, default: Boolean) = { + parameters.get(name).map { str => + try { + str.toBoolean + } catch { + case _: IllegalArgumentException => + throw new IllegalArgumentException( + s"Invalid value '$str' for option '$name', must be 'true' or 'false'") + } + }.getOrElse(default) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 0f09b0a0c8..411a15ffce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming +import java.net.URI + import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{FileStatus, Path} @@ -79,9 +81,16 @@ class FileStreamSource( sourceOptions.maxFileAgeMs } + private val fileNameOnly = sourceOptions.fileNameOnly + if (fileNameOnly) { + logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " + + "UUID), otherwise, files with the same name but under different paths will be considered " + + "the same and causes data lost.") + } + /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. - val seenFiles = new SeenFilesMap(maxFileAgeMs) + val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly) metadataLog.allFiles().foreach { entry => seenFiles.add(entry.path, entry.timestamp) @@ -268,7 +277,7 @@ object FileStreamSource { * To prevent the hash map from growing indefinitely, a purge function is available to * remove files "maxAgeMs" older than the latest file. */ - class SeenFilesMap(maxAgeMs: Long) { + class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean) { require(maxAgeMs >= 0) /** Mapping from file to its timestamp. */ @@ -280,9 +289,13 @@ object FileStreamSource { /** Timestamp for the last purge operation. */ private var lastPurgeTimestamp: Timestamp = 0L + @inline private def stripPathIfNecessary(path: String) = { + if (fileNameOnly) new Path(new URI(path)).getName else path + } + /** Add a new file to the map. */ def add(path: String, timestamp: Timestamp): Unit = { - map.put(path, timestamp) + map.put(stripPathIfNecessary(path), timestamp) if (timestamp > latestTimestamp) { latestTimestamp = timestamp } @@ -295,7 +308,7 @@ object FileStreamSource { def isNewFile(path: String, timestamp: Timestamp): Boolean = { // Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that // is older than (latestTimestamp - maxAgeMs) but has not been purged yet. - timestamp >= lastPurgeTimestamp && !map.containsKey(path) + timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path)) } /** Removes aged entries and returns the number of files removed. */ @@ -314,9 +327,5 @@ object FileStreamSource { } def size: Int = map.size() - - def allEntries: Seq[(String, Timestamp)] = { - map.asScala.toSeq - } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 0517b0a800..f705da3d6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1236,7 +1236,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } test("SeenFilesMap") { - val map = new SeenFilesMap(maxAgeMs = 10) + val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false) map.add("a", 5) assert(map.size == 1) @@ -1269,8 +1269,26 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(map.isNewFile("e", 20)) } + test("SeenFilesMap with fileNameOnly = true") { + val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true) + + map.add("file:///a/b/c/d", 5) + map.add("file:///a/b/c/e", 5) + assert(map.size === 2) + + assert(!map.isNewFile("d", 5)) + assert(!map.isNewFile("file:///d", 5)) + assert(!map.isNewFile("file:///x/d", 5)) + assert(!map.isNewFile("file:///x/y/d", 5)) + + map.add("s3:///bucket/d", 5) + map.add("s3n:///bucket/d", 5) + map.add("s3a:///bucket/d", 5) + assert(map.size === 2) + } + test("SeenFilesMap should only consider a file old if it is earlier than last purge time") { - val map = new SeenFilesMap(maxAgeMs = 10) + val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false) map.add("a", 20) assert(map.size == 1) -- GitLab