diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 25ebe1797bed883360f18e59bce25de88bbb7ff1..fe64838696a6e36b08d6df6f97b4e58cba586f0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -38,7 +38,10 @@ class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging { } /** - * Maximum age of a file that can be found in this directory, before it is deleted. + * Maximum age of a file that can be found in this directory, before it is ignored. For the + * first batch all files will be considered valid. If `latestFirst` is set to `true` and + * `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are + * valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details. * * The max age is specified with respect to the timestamp of the latest file, and not the * timestamp of the current system. That this means if the last file has timestamp 1000, and the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 39c0b4979687b1dd615b4512d67bd6f4b3911770..0f0b6f1893583c925a873f8d0de62ccfbf2569f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -64,23 +64,29 @@ class FileStreamSource( private val fileSortOrder = if (sourceOptions.latestFirst) { logWarning( - """'latestFirst' is true. New files will be processed first. - |It may affect the watermark value""".stripMargin) + """'latestFirst' is true. New files will be processed first, which may affect the watermark + |value. In addition, 'maxFileAge' will be ignored.""".stripMargin) implicitly[Ordering[Long]].reverse } else { implicitly[Ordering[Long]] } + private val maxFileAgeMs: Long = if (sourceOptions.latestFirst && maxFilesPerBatch.isDefined) { + Long.MaxValue + } else { + sourceOptions.maxFileAgeMs + } + /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. - val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) + val seenFiles = new SeenFilesMap(maxFileAgeMs) metadataLog.allFiles().foreach { entry => seenFiles.add(entry.path, entry.timestamp) } seenFiles.purge() - logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}") + logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = $maxFileAgeMs") /** * Returns the maximum offset that can be retrieved from the source. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 8a9fa94bea60174aa1885c0ab7fbc930f43ff6bb..f14aedbba86ab8ecf5a113736cce84e795b7aa6f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1078,6 +1078,41 @@ class FileStreamSourceSuite extends FileStreamSourceTest { SerializedOffset(str.trim) } + private def runTwoBatchesAndVerifyResults( + src: File, + latestFirst: Boolean, + firstBatch: String, + secondBatch: String, + maxFileAge: Option[String] = None): Unit = { + val srcOptions = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1") ++ + maxFileAge.map("maxFileAge" -> _) + val fileStream = createFileStream( + "text", + src.getCanonicalPath, + options = srcOptions) + val clock = new StreamManualClock() + testStream(fileStream)( + StartStream(trigger = ProcessingTime(10), triggerClock = clock), + AssertOnQuery { _ => + // Block until the first batch finishes. + eventually(timeout(streamingTimeout)) { + assert(clock.isStreamWaitingAt(0)) + } + true + }, + CheckLastBatch(firstBatch), + AdvanceManualClock(10), + AssertOnQuery { _ => + // Block until the second batch finishes. + eventually(timeout(streamingTimeout)) { + assert(clock.isStreamWaitingAt(10)) + } + true + }, + CheckLastBatch(secondBatch) + ) + } + test("FileStreamSource - latestFirst") { withTempDir { src => // Prepare two files: 1.txt, 2.txt, and make sure they have different modified time. @@ -1085,42 +1120,23 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val f2 = stringToFile(new File(src, "2.txt"), "2") f2.setLastModified(f1.lastModified + 1000) - def runTwoBatchesAndVerifyResults( - latestFirst: Boolean, - firstBatch: String, - secondBatch: String): Unit = { - val fileStream = createFileStream( - "text", - src.getCanonicalPath, - options = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1")) - val clock = new StreamManualClock() - testStream(fileStream)( - StartStream(trigger = ProcessingTime(10), triggerClock = clock), - AssertOnQuery { _ => - // Block until the first batch finishes. - eventually(timeout(streamingTimeout)) { - assert(clock.isStreamWaitingAt(0)) - } - true - }, - CheckLastBatch(firstBatch), - AdvanceManualClock(10), - AssertOnQuery { _ => - // Block until the second batch finishes. - eventually(timeout(streamingTimeout)) { - assert(clock.isStreamWaitingAt(10)) - } - true - }, - CheckLastBatch(secondBatch) - ) - } - // Read oldest files first, so the first batch is "1", and the second batch is "2". - runTwoBatchesAndVerifyResults(latestFirst = false, firstBatch = "1", secondBatch = "2") + runTwoBatchesAndVerifyResults(src, latestFirst = false, firstBatch = "1", secondBatch = "2") // Read latest files first, so the first batch is "2", and the second batch is "1". - runTwoBatchesAndVerifyResults(latestFirst = true, firstBatch = "2", secondBatch = "1") + runTwoBatchesAndVerifyResults(src, latestFirst = true, firstBatch = "2", secondBatch = "1") + } + } + + test("SPARK-19813: Ignore maxFileAge when maxFilesPerTrigger and latestFirst is used") { + withTempDir { src => + // Prepare two files: 1.txt, 2.txt, and make sure they have different modified time. + val f1 = stringToFile(new File(src, "1.txt"), "1") + val f2 = stringToFile(new File(src, "2.txt"), "2") + f2.setLastModified(f1.lastModified + 3600 * 1000 /* 1 hour later */) + + runTwoBatchesAndVerifyResults(src, latestFirst = true, firstBatch = "2", secondBatch = "1", + maxFileAge = Some("1m") /* 1 minute */) } }