From e50efd53f073890d789a8448f850cc219cca7708 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Mon, 11 Jul 2016 18:41:36 -0700
Subject: [PATCH] [SPARK-16430][SQL][STREAMING] Fixed bug in the
 maxFilesPerTrigger in FileStreamSource

## What changes were proposed in this pull request?

Incorrect list of files were being allocated to a batch. This caused a file to read multiple times in the multiple batches.

## How was this patch tested?

Added unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #14143 from tdas/SPARK-16430-1.
---
 .../streaming/FileStreamSource.scala          |  6 ++--
 .../sql/streaming/FileStreamSourceSuite.scala | 35 +++++++++++++++++--
 2 files changed, 36 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 72b335a42e..0cfad659dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -73,8 +73,8 @@ class FileStreamSource(
     logTrace(s"Number of seen files = ${seenFiles.size}")
     if (batchFiles.nonEmpty) {
       maxBatchId += 1
-      metadataLog.add(maxBatchId, newFiles)
-      logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} new files")
+      metadataLog.add(maxBatchId, batchFiles)
+      logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
     }
 
     new LongOffset(maxBatchId)
@@ -138,7 +138,7 @@ class FileStreamSource(
       .map { str =>
         Try(str.toInt).toOption.filter(_ > 0).getOrElse {
           throw new IllegalArgumentException(
-            s"Invalid value '$str' for option 'maxFilesPerBatch', must be a positive integer")
+            s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
         }
       }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 3d28d4f99c..47260a23c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -627,6 +627,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         checkAnswer(df, data.map(_.toString).toDF("value"))
       }
 
+      def checkAllData(data: Seq[Int]): Unit = {
+        val schema = StructType(Seq(StructField("value", StringType)))
+        val df = spark.createDataFrame(
+          spark.sparkContext.makeRDD(memorySink.allData), schema)
+        checkAnswer(df, data.map(_.toString).toDF("value"))
+      }
+
       /** Check how many batches have executed since the last time this check was made */
       var lastBatchId = -1L
       def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
@@ -636,6 +643,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       }
 
       checkLastBatchData(3)  // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
+      checkAllData(1 to 3)
       lastBatchId = memorySink.latestBatchId.get
 
       fileSource.withBatchingLocked {
@@ -645,8 +653,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         createFile(7)   // 6 and 7 should be in the last batch
       }
       q.processAllAvailable()
-      checkLastBatchData(6, 7)
       checkNumBatchesSinceLastCheck(2)
+      checkLastBatchData(6, 7)
+      checkAllData(1 to 7)
 
       fileSource.withBatchingLocked {
         createFile(8)
@@ -656,8 +665,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         createFile(12)   // 12 should be in the last batch
       }
       q.processAllAvailable()
-      checkLastBatchData(12)
       checkNumBatchesSinceLastCheck(3)
+      checkLastBatchData(12)
+      checkAllData(1 to 12)
+
+      q.stop()
+    }
+  }
+
+  test("max files per trigger - incorrect values") {
+    withTempDir { case src =>
+      def testMaxFilePerTriggerValue(value: String): Unit = {
+        val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
+        val e = intercept[IllegalArgumentException] {
+          testStream(df)()
+        }
+        Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
+          assert(e.getMessage.contains(s))
+        }
+      }
+
+      testMaxFilePerTriggerValue("not-a-integer")
+      testMaxFilePerTriggerValue("-1")
+      testMaxFilePerTriggerValue("0")
+      testMaxFilePerTriggerValue("10.1")
     }
   }
 
-- 
GitLab