Skip to content
Snippets Groups Projects
Commit a3648b5d authored by Burak Yavuz's avatar Burak Yavuz
Browse files

[SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in...

[SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource

## What changes were proposed in this pull request?

**The Problem**
There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days.
This causes a problem when both
latestFirst = true
maxFilesPerTrigger > total files to be processed.
Here is what happens in all combinations
1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed.
2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind.
3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing.
The bug is with case 3.

**The Solution**

Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set.

## How was this patch tested?

Regression test in `FileStreamSourceSuite`

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #17153 from brkyvz/maxFileAge.
parent 45512902
No related branches found
No related tags found
No related merge requests found
......@@ -38,7 +38,10 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
}
/**
* Maximum age of a file that can be found in this directory, before it is deleted.
* Maximum age of a file that can be found in this directory, before it is ignored. For the
* first batch all files will be considered valid. If `latestFirst` is set to `true` and
* `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are
* valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details.
*
* The max age is specified with respect to the timestamp of the latest file, and not the
* timestamp of the current system. That this means if the last file has timestamp 1000, and the
......
......@@ -66,23 +66,29 @@ class FileStreamSource(
private val fileSortOrder = if (sourceOptions.latestFirst) {
logWarning(
"""'latestFirst' is true. New files will be processed first.
|It may affect the watermark value""".stripMargin)
"""'latestFirst' is true. New files will be processed first, which may affect the watermark
|value. In addition, 'maxFileAge' will be ignored.""".stripMargin)
implicitly[Ordering[Long]].reverse
} else {
implicitly[Ordering[Long]]
}
private val maxFileAgeMs: Long = if (sourceOptions.latestFirst && maxFilesPerBatch.isDefined) {
Long.MaxValue
} else {
sourceOptions.maxFileAgeMs
}
/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
val seenFiles = new SeenFilesMap(maxFileAgeMs)
metadataLog.allFiles().foreach { entry =>
seenFiles.add(entry.path, entry.timestamp)
}
seenFiles.purge()
logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}")
logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = $maxFileAgeMs")
/**
* Returns the maximum offset that can be retrieved from the source.
......
......@@ -1173,6 +1173,41 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
SerializedOffset(str.trim)
}
private def runTwoBatchesAndVerifyResults(
src: File,
latestFirst: Boolean,
firstBatch: String,
secondBatch: String,
maxFileAge: Option[String] = None): Unit = {
val srcOptions = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1") ++
maxFileAge.map("maxFileAge" -> _)
val fileStream = createFileStream(
"text",
src.getCanonicalPath,
options = srcOptions)
val clock = new StreamManualClock()
testStream(fileStream)(
StartStream(trigger = ProcessingTime(10), triggerClock = clock),
AssertOnQuery { _ =>
// Block until the first batch finishes.
eventually(timeout(streamingTimeout)) {
assert(clock.isStreamWaitingAt(0))
}
true
},
CheckLastBatch(firstBatch),
AdvanceManualClock(10),
AssertOnQuery { _ =>
// Block until the second batch finishes.
eventually(timeout(streamingTimeout)) {
assert(clock.isStreamWaitingAt(10))
}
true
},
CheckLastBatch(secondBatch)
)
}
test("FileStreamSource - latestFirst") {
withTempDir { src =>
// Prepare two files: 1.txt, 2.txt, and make sure they have different modified time.
......@@ -1180,42 +1215,23 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val f2 = stringToFile(new File(src, "2.txt"), "2")
f2.setLastModified(f1.lastModified + 1000)
def runTwoBatchesAndVerifyResults(
latestFirst: Boolean,
firstBatch: String,
secondBatch: String): Unit = {
val fileStream = createFileStream(
"text",
src.getCanonicalPath,
options = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1"))
val clock = new StreamManualClock()
testStream(fileStream)(
StartStream(trigger = ProcessingTime(10), triggerClock = clock),
AssertOnQuery { _ =>
// Block until the first batch finishes.
eventually(timeout(streamingTimeout)) {
assert(clock.isStreamWaitingAt(0))
}
true
},
CheckLastBatch(firstBatch),
AdvanceManualClock(10),
AssertOnQuery { _ =>
// Block until the second batch finishes.
eventually(timeout(streamingTimeout)) {
assert(clock.isStreamWaitingAt(10))
}
true
},
CheckLastBatch(secondBatch)
)
}
// Read oldest files first, so the first batch is "1", and the second batch is "2".
runTwoBatchesAndVerifyResults(latestFirst = false, firstBatch = "1", secondBatch = "2")
runTwoBatchesAndVerifyResults(src, latestFirst = false, firstBatch = "1", secondBatch = "2")
// Read latest files first, so the first batch is "2", and the second batch is "1".
runTwoBatchesAndVerifyResults(latestFirst = true, firstBatch = "2", secondBatch = "1")
runTwoBatchesAndVerifyResults(src, latestFirst = true, firstBatch = "2", secondBatch = "1")
}
}
test("SPARK-19813: Ignore maxFileAge when maxFilesPerTrigger and latestFirst is used") {
withTempDir { src =>
// Prepare two files: 1.txt, 2.txt, and make sure they have different modified time.
val f1 = stringToFile(new File(src, "1.txt"), "1")
val f2 = stringToFile(new File(src, "2.txt"), "2")
f2.setLastModified(f1.lastModified + 3600 * 1000 /* 1 hour later */)
runTwoBatchesAndVerifyResults(src, latestFirst = true, firstBatch = "2", secondBatch = "1",
maxFileAge = Some("1m") /* 1 minute */)
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment