Skip to content
Snippets Groups Projects
Commit e50efd53 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-16430][SQL][STREAMING] Fixed bug in the maxFilesPerTrigger in FileStreamSource

## What changes were proposed in this pull request?

Incorrect list of files were being allocated to a batch. This caused a file to read multiple times in the multiple batches.

## How was this patch tested?

Added unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #14143 from tdas/SPARK-16430-1.
parent 91a443b8
No related branches found
No related tags found
No related merge requests found
......@@ -73,8 +73,8 @@ class FileStreamSource(
logTrace(s"Number of seen files = ${seenFiles.size}")
if (batchFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, newFiles)
logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} new files")
metadataLog.add(maxBatchId, batchFiles)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}
new LongOffset(maxBatchId)
......@@ -138,7 +138,7 @@ class FileStreamSource(
.map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'maxFilesPerBatch', must be a positive integer")
s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
}
}
}
......
......@@ -627,6 +627,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
checkAnswer(df, data.map(_.toString).toDF("value"))
}
def checkAllData(data: Seq[Int]): Unit = {
val schema = StructType(Seq(StructField("value", StringType)))
val df = spark.createDataFrame(
spark.sparkContext.makeRDD(memorySink.allData), schema)
checkAnswer(df, data.map(_.toString).toDF("value"))
}
/** Check how many batches have executed since the last time this check was made */
var lastBatchId = -1L
def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
......@@ -636,6 +643,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
checkLastBatchData(3) // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
checkAllData(1 to 3)
lastBatchId = memorySink.latestBatchId.get
fileSource.withBatchingLocked {
......@@ -645,8 +653,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
createFile(7) // 6 and 7 should be in the last batch
}
q.processAllAvailable()
checkLastBatchData(6, 7)
checkNumBatchesSinceLastCheck(2)
checkLastBatchData(6, 7)
checkAllData(1 to 7)
fileSource.withBatchingLocked {
createFile(8)
......@@ -656,8 +665,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
createFile(12) // 12 should be in the last batch
}
q.processAllAvailable()
checkLastBatchData(12)
checkNumBatchesSinceLastCheck(3)
checkLastBatchData(12)
checkAllData(1 to 12)
q.stop()
}
}
test("max files per trigger - incorrect values") {
withTempDir { case src =>
def testMaxFilePerTriggerValue(value: String): Unit = {
val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
val e = intercept[IllegalArgumentException] {
testStream(df)()
}
Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
assert(e.getMessage.contains(s))
}
}
testMaxFilePerTriggerValue("not-a-integer")
testMaxFilePerTriggerValue("-1")
testMaxFilePerTriggerValue("0")
testMaxFilePerTriggerValue("10.1")
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment