diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 2f802d782f5ad9989604b37c9efed5c5707d05c3..e7ba901945490475e7db2bbaca3954284eec1a3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -38,7 +38,10 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging } /** - * Maximum age of a file that can be found in this directory, before it is deleted. + * Maximum age of a file that can be found in this directory, before it is ignored. For the + * first batch all files will be considered valid. If `latestFirst` is set to `true` and + * `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are + * valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details. * * The max age is specified with respect to the timestamp of the latest file, and not the * timestamp of the current system. That this means if the last file has timestamp 1000, and the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 6a7263ca45d85c9e88763c96160e2227c7cb79d7..0f09b0a0c8f2597e7c188d97312a2bbee7899442 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -66,23 +66,29 @@ class FileStreamSource( private val fileSortOrder = if (sourceOptions.latestFirst) { logWarning( - """'latestFirst' is true. New files will be processed first. - |It may affect the watermark value""".stripMargin) + """'latestFirst' is true. New files will be processed first, which may affect the watermark + |value. In addition, 'maxFileAge' will be ignored.""".stripMargin) implicitly[Ordering[Long]].reverse } else { implicitly[Ordering[Long]] } + private val maxFileAgeMs: Long = if (sourceOptions.latestFirst && maxFilesPerBatch.isDefined) { + Long.MaxValue + } else { + sourceOptions.maxFileAgeMs + } + /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. - val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) + val seenFiles = new SeenFilesMap(maxFileAgeMs) metadataLog.allFiles().foreach { entry => seenFiles.add(entry.path, entry.timestamp) } seenFiles.purge() - logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}") + logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = $maxFileAgeMs") /** * Returns the maximum offset that can be retrieved from the source. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 1586850c77fcadd91e3a4cac669fe31ac86f22f6..0517b0a800e5319905e2b1d155fc686ac0b1f48a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1173,6 +1173,41 @@ class FileStreamSourceSuite extends FileStreamSourceTest { SerializedOffset(str.trim) } + private def runTwoBatchesAndVerifyResults( + src: File, + latestFirst: Boolean, + firstBatch: String, + secondBatch: String, + maxFileAge: Option[String] = None): Unit = { + val srcOptions = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1") ++ + maxFileAge.map("maxFileAge" -> _) + val fileStream = createFileStream( + "text", + src.getCanonicalPath, + options = srcOptions) + val clock = new StreamManualClock() + testStream(fileStream)( + StartStream(trigger = ProcessingTime(10), triggerClock = clock), + AssertOnQuery { _ => + // Block until the first batch finishes. + eventually(timeout(streamingTimeout)) { + assert(clock.isStreamWaitingAt(0)) + } + true + }, + CheckLastBatch(firstBatch), + AdvanceManualClock(10), + AssertOnQuery { _ => + // Block until the second batch finishes. + eventually(timeout(streamingTimeout)) { + assert(clock.isStreamWaitingAt(10)) + } + true + }, + CheckLastBatch(secondBatch) + ) + } + test("FileStreamSource - latestFirst") { withTempDir { src => // Prepare two files: 1.txt, 2.txt, and make sure they have different modified time. @@ -1180,42 +1215,23 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val f2 = stringToFile(new File(src, "2.txt"), "2") f2.setLastModified(f1.lastModified + 1000) - def runTwoBatchesAndVerifyResults( - latestFirst: Boolean, - firstBatch: String, - secondBatch: String): Unit = { - val fileStream = createFileStream( - "text", - src.getCanonicalPath, - options = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1")) - val clock = new StreamManualClock() - testStream(fileStream)( - StartStream(trigger = ProcessingTime(10), triggerClock = clock), - AssertOnQuery { _ => - // Block until the first batch finishes. - eventually(timeout(streamingTimeout)) { - assert(clock.isStreamWaitingAt(0)) - } - true - }, - CheckLastBatch(firstBatch), - AdvanceManualClock(10), - AssertOnQuery { _ => - // Block until the second batch finishes. - eventually(timeout(streamingTimeout)) { - assert(clock.isStreamWaitingAt(10)) - } - true - }, - CheckLastBatch(secondBatch) - ) - } - // Read oldest files first, so the first batch is "1", and the second batch is "2". - runTwoBatchesAndVerifyResults(latestFirst = false, firstBatch = "1", secondBatch = "2") + runTwoBatchesAndVerifyResults(src, latestFirst = false, firstBatch = "1", secondBatch = "2") // Read latest files first, so the first batch is "2", and the second batch is "1". - runTwoBatchesAndVerifyResults(latestFirst = true, firstBatch = "2", secondBatch = "1") + runTwoBatchesAndVerifyResults(src, latestFirst = true, firstBatch = "2", secondBatch = "1") + } + } + + test("SPARK-19813: Ignore maxFileAge when maxFilesPerTrigger and latestFirst is used") { + withTempDir { src => + // Prepare two files: 1.txt, 2.txt, and make sure they have different modified time. + val f1 = stringToFile(new File(src, "1.txt"), "1") + val f2 = stringToFile(new File(src, "2.txt"), "2") + f2.setLastModified(f1.lastModified + 3600 * 1000 /* 1 hour later */) + + runTwoBatchesAndVerifyResults(src, latestFirst = true, firstBatch = "2", secondBatch = "1", + maxFileAge = Some("1m") /* 1 minute */) } }