diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 72b335a42ed343ab0f47b10200b9be8a80746ecf..0cfad659dc92ce3da7b4407166deb59babc13d0c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -73,8 +73,8 @@ class FileStreamSource(
     logTrace(s"Number of seen files = ${seenFiles.size}")
     if (batchFiles.nonEmpty) {
       maxBatchId += 1
-      metadataLog.add(maxBatchId, newFiles)
-      logInfo(s"Max batch id increased to $maxBatchId with ${newFiles.size} new files")
+      metadataLog.add(maxBatchId, batchFiles)
+      logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
     }
 
     new LongOffset(maxBatchId)
@@ -138,7 +138,7 @@ class FileStreamSource(
       .map { str =>
         Try(str.toInt).toOption.filter(_ > 0).getOrElse {
           throw new IllegalArgumentException(
-            s"Invalid value '$str' for option 'maxFilesPerBatch', must be a positive integer")
+            s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
         }
       }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 3d28d4f99cfc148ff609405df32ce480b1ec48d2..47260a23c7ee352630ffbc1a426ff546edd22bf5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -627,6 +627,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         checkAnswer(df, data.map(_.toString).toDF("value"))
       }
 
+      def checkAllData(data: Seq[Int]): Unit = {
+        val schema = StructType(Seq(StructField("value", StringType)))
+        val df = spark.createDataFrame(
+          spark.sparkContext.makeRDD(memorySink.allData), schema)
+        checkAnswer(df, data.map(_.toString).toDF("value"))
+      }
+
       /** Check how many batches have executed since the last time this check was made */
       var lastBatchId = -1L
       def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
@@ -636,6 +643,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       }
 
       checkLastBatchData(3)  // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
+      checkAllData(1 to 3)
       lastBatchId = memorySink.latestBatchId.get
 
       fileSource.withBatchingLocked {
@@ -645,8 +653,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         createFile(7)   // 6 and 7 should be in the last batch
       }
       q.processAllAvailable()
-      checkLastBatchData(6, 7)
       checkNumBatchesSinceLastCheck(2)
+      checkLastBatchData(6, 7)
+      checkAllData(1 to 7)
 
       fileSource.withBatchingLocked {
         createFile(8)
@@ -656,8 +665,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         createFile(12)   // 12 should be in the last batch
       }
       q.processAllAvailable()
-      checkLastBatchData(12)
       checkNumBatchesSinceLastCheck(3)
+      checkLastBatchData(12)
+      checkAllData(1 to 12)
+
+      q.stop()
+    }
+  }
+
+  test("max files per trigger - incorrect values") {
+    withTempDir { case src =>
+      def testMaxFilePerTriggerValue(value: String): Unit = {
+        val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
+        val e = intercept[IllegalArgumentException] {
+          testStream(df)()
+        }
+        Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
+          assert(e.getMessage.contains(s))
+        }
+      }
+
+      testMaxFilePerTriggerValue("not-a-integer")
+      testMaxFilePerTriggerValue("-1")
+      testMaxFilePerTriggerValue("0")
+      testMaxFilePerTriggerValue("10.1")
     }
   }