diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index fdea65cb10ae034df3d3001b4292b191b944c5da..25ebe1797bed883360f18e59bce25de88bbb7ff1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -53,4 +53,18 @@ class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging {
   /** Options as specified by the user, in a case-insensitive map, without "path" set. */
   val optionMapWithoutPath: Map[String, String] =
     parameters.filterKeys(_ != "path")
+
+  /**
+   * Whether to scan latest files first. If it's true, when the source finds unprocessed files in a
+   * trigger, it will first process the latest files.
+   */
+  val latestFirst: Boolean = parameters.get("latestFirst").map { str =>
+    try {
+      str.toBoolean
+    } catch {
+      case _: IllegalArgumentException =>
+        throw new IllegalArgumentException(
+          s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'")
+    }
+  }.getOrElse(false)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 20e0dcef8ffda0649be8c54399879d5c85e088f0..39c0b4979687b1dd615b4512d67bd6f4b3911770 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -62,6 +62,15 @@ class FileStreamSource(
   /** Maximum number of new files to be considered in each batch */
   private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger
 
+  private val fileSortOrder = if (sourceOptions.latestFirst) {
+      logWarning(
+        """'latestFirst' is true. New files will be processed first.
+          |It may affect the watermark value""".stripMargin)
+      implicitly[Ordering[Long]].reverse
+    } else {
+      implicitly[Ordering[Long]]
+    }
+
   /** A mapping from a file that we have processed to some timestamp it was last modified. */
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
@@ -155,7 +164,7 @@ class FileStreamSource(
     val startTime = System.nanoTime
     val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
     val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
-    val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
+    val files = catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status =>
       (status.getPath.toUri.toString, status.getModificationTime)
     }
     val endTime = System.nanoTime
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index bcb68520407bc3ef80d6a322f772a2ed2927d9f2..b96ccb4e6cbf54c818f2f22c9dfe08b3d86d7cf4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming
 import java.io.File
 
 import org.scalatest.PrivateMethodTester
+import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql._
@@ -1059,6 +1060,52 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
     SerializedOffset(str.trim)
   }
+
+  test("FileStreamSource - latestFirst") {
+    withTempDir { src =>
+      // Prepare two files: 1.txt, 2.txt, and make sure they have different modified time.
+      val f1 = stringToFile(new File(src, "1.txt"), "1")
+      val f2 = stringToFile(new File(src, "2.txt"), "2")
+      f2.setLastModified(f1.lastModified + 1000)
+
+      def runTwoBatchesAndVerifyResults(
+          latestFirst: Boolean,
+          firstBatch: String,
+          secondBatch: String): Unit = {
+        val fileStream = createFileStream(
+          "text",
+          src.getCanonicalPath,
+          options = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1"))
+        val clock = new StreamManualClock()
+        testStream(fileStream)(
+          StartStream(trigger = ProcessingTime(10), triggerClock = clock),
+          AssertOnQuery { _ =>
+            // Block until the first batch finishes.
+            eventually(timeout(streamingTimeout)) {
+              assert(clock.isStreamWaitingAt(0))
+            }
+            true
+          },
+          CheckLastBatch(firstBatch),
+          AdvanceManualClock(10),
+          AssertOnQuery { _ =>
+            // Block until the second batch finishes.
+            eventually(timeout(streamingTimeout)) {
+              assert(clock.isStreamWaitingAt(10))
+            }
+            true
+          },
+          CheckLastBatch(secondBatch)
+        )
+      }
+
+      // Read oldest files first, so the first batch is "1", and the second batch is "2".
+      runTwoBatchesAndVerifyResults(latestFirst = false, firstBatch = "1", secondBatch = "2")
+
+      // Read latest files first, so the first batch is "2", and the second batch is "1".
+      runTwoBatchesAndVerifyResults(latestFirst = true, firstBatch = "2", secondBatch = "1")
+    }
+  }
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest {