Skip to content
Snippets Groups Projects
Commit 900ce558 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[SPARK-18826][SS] Add 'latestFirst' option to FileStreamSource


## What changes were proposed in this pull request?

When starting a stream with a lot of backfill and maxFilesPerTrigger, the user could often want to start with most recent files first. This would let you keep low latency for recent data and slowly backfill historical data.

This PR adds a new option `latestFirst` to control this behavior. When it's true, `FileStreamSource` will sort the files by the modified time from latest to oldest, and take the first `maxFilesPerTrigger` files as a new batch.

## How was this patch tested?

The added test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16251 from zsxwing/newest-first.

(cherry picked from commit 68a6dc97)
Signed-off-by: default avatarTathagata Das <tathagata.das1565@gmail.com>
parent e430915f
No related branches found
No related tags found
No related merge requests found
......@@ -53,4 +53,18 @@ class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging {
/** Options as specified by the user, in a case-insensitive map, without "path" set. */
val optionMapWithoutPath: Map[String, String] =
parameters.filterKeys(_ != "path")
/**
* Whether to scan latest files first. If it's true, when the source finds unprocessed files in a
* trigger, it will first process the latest files.
*/
val latestFirst: Boolean = parameters.get("latestFirst").map { str =>
try {
str.toBoolean
} catch {
case _: IllegalArgumentException =>
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'")
}
}.getOrElse(false)
}
......@@ -62,6 +62,15 @@ class FileStreamSource(
/** Maximum number of new files to be considered in each batch */
private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger
private val fileSortOrder = if (sourceOptions.latestFirst) {
logWarning(
"""'latestFirst' is true. New files will be processed first.
|It may affect the watermark value""".stripMargin)
implicitly[Ordering[Long]].reverse
} else {
implicitly[Ordering[Long]]
}
/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
......@@ -155,7 +164,7 @@ class FileStreamSource(
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
val files = catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status =>
(status.getPath.toUri.toString, status.getModificationTime)
}
val endTime = System.nanoTime
......
......@@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming
import java.io.File
import org.scalatest.PrivateMethodTester
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql._
......@@ -1059,6 +1060,52 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
SerializedOffset(str.trim)
}
test("FileStreamSource - latestFirst") {
withTempDir { src =>
// Prepare two files: 1.txt, 2.txt, and make sure they have different modified time.
val f1 = stringToFile(new File(src, "1.txt"), "1")
val f2 = stringToFile(new File(src, "2.txt"), "2")
f2.setLastModified(f1.lastModified + 1000)
def runTwoBatchesAndVerifyResults(
latestFirst: Boolean,
firstBatch: String,
secondBatch: String): Unit = {
val fileStream = createFileStream(
"text",
src.getCanonicalPath,
options = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1"))
val clock = new StreamManualClock()
testStream(fileStream)(
StartStream(trigger = ProcessingTime(10), triggerClock = clock),
AssertOnQuery { _ =>
// Block until the first batch finishes.
eventually(timeout(streamingTimeout)) {
assert(clock.isStreamWaitingAt(0))
}
true
},
CheckLastBatch(firstBatch),
AdvanceManualClock(10),
AssertOnQuery { _ =>
// Block until the second batch finishes.
eventually(timeout(streamingTimeout)) {
assert(clock.isStreamWaitingAt(10))
}
true
},
CheckLastBatch(secondBatch)
)
}
// Read oldest files first, so the first batch is "1", and the second batch is "2".
runTwoBatchesAndVerifyResults(latestFirst = false, firstBatch = "1", secondBatch = "2")
// Read latest files first, so the first batch is "2", and the second batch is "1".
runTwoBatchesAndVerifyResults(latestFirst = true, firstBatch = "2", secondBatch = "1")
}
}
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment