diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index d95ec7f67feb3ba9fd21d10cda39ba1aef614954..1b413528935f68e341cb27e766ca90e90cfbac83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -138,14 +138,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     }
   }
 
-  /**
-   * Write a batch to a temp file then rename it to the batch file.
-   *
-   * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
-   * valid behavior, we still need to prevent it from destroying the files.
-   */
-  private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
-    // Use nextId to create a temp file
+  def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = {
     var nextId = 0
     while (true) {
       val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
@@ -153,33 +146,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
         val output = fileManager.create(tempPath)
         try {
           writer(metadata, output)
+          return Some(tempPath)
         } finally {
           IOUtils.closeQuietly(output)
         }
-        try {
-          // Try to commit the batch
-          // It will fail if there is an existing file (someone has committed the batch)
-          logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
-          fileManager.rename(tempPath, batchIdToPath(batchId))
-
-          // SPARK-17475: HDFSMetadataLog should not leak CRC files
-          // If the underlying filesystem didn't rename the CRC file, delete it.
-          val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
-          if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
-          return
-        } catch {
-          case e: IOException if isFileAlreadyExistsException(e) =>
-            // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
-            // So throw an exception to tell the user this is not a valid behavior.
-            throw new ConcurrentModificationException(
-              s"Multiple HDFSMetadataLog are using $path", e)
-          case e: FileNotFoundException =>
-            // Sometimes, "create" will succeed when multiple writers are calling it at the same
-            // time. However, only one writer can call "rename" successfully, others will get
-            // FileNotFoundException because the first writer has removed it.
-            throw new ConcurrentModificationException(
-              s"Multiple HDFSMetadataLog are using $path", e)
-        }
       } catch {
         case e: IOException if isFileAlreadyExistsException(e) =>
           // Failed to create "tempPath". There are two cases:
@@ -195,10 +165,45 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
           // metadata path. In addition, the old Streaming also have this issue, people can create
           // malicious checkpoint files to crash a Streaming application too.
           nextId += 1
-      } finally {
-        fileManager.delete(tempPath)
       }
     }
+    None
+  }
+
+  /**
+   * Write a batch to a temp file then rename it to the batch file.
+   *
+   * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
+   * valid behavior, we still need to prevent it from destroying the files.
+   */
+  private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
+    val tempPath = writeTempBatch(metadata, writer).getOrElse(
+      throw new IllegalStateException(s"Unable to create temp batch file $batchId"))
+    try {
+      // Try to commit the batch
+      // It will fail if there is an existing file (someone has committed the batch)
+      logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
+      fileManager.rename(tempPath, batchIdToPath(batchId))
+
+      // SPARK-17475: HDFSMetadataLog should not leak CRC files
+      // If the underlying filesystem didn't rename the CRC file, delete it.
+      val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
+      if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
+    } catch {
+      case e: IOException if isFileAlreadyExistsException(e) =>
+        // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
+        // So throw an exception to tell the user this is not a valid behavior.
+        throw new ConcurrentModificationException(
+          s"Multiple HDFSMetadataLog are using $path", e)
+      case e: FileNotFoundException =>
+        // Sometimes, "create" will succeed when multiple writers are calling it at the same
+        // time. However, only one writer can call "rename" successfully, others will get
+        // FileNotFoundException because the first writer has removed it.
+        throw new ConcurrentModificationException(
+          s"Multiple HDFSMetadataLog are using $path", e)
+    } finally {
+      fileManager.delete(tempPath)
+    }
   }
 
   private def isFileAlreadyExistsException(e: IOException): Boolean = {
@@ -208,6 +213,22 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
       (e.getMessage != null && e.getMessage.startsWith("File already exists: "))
   }
 
+  /**
+   * @return the deserialized metadata in a batch file, or None if file not exist.
+   * @throws IllegalArgumentException when path does not point to a batch file.
+   */
+  def get(batchFile: Path): Option[T] = {
+    if (fileManager.exists(batchFile)) {
+      if (isBatchFile(batchFile)) {
+        get(pathToBatchId(batchFile))
+      } else {
+        throw new IllegalArgumentException(s"File ${batchFile} is not a batch file!")
+      }
+    } else {
+      None
+    }
+  }
+
   override def get(batchId: Long): Option[T] = {
     val batchMetadataFile = batchIdToPath(batchId)
     if (fileManager.exists(batchMetadataFile)) {
@@ -250,6 +271,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     None
   }
 
+  /**
+   * Get an array of [FileStatus] referencing batch files.
+   * The array is sorted by most recent batch file first to
+   * oldest batch file.
+   */
+  def getOrderedBatchFiles(): Array[FileStatus] = {
+    fileManager.list(metadataPath, batchFilesFilter)
+      .sortBy(f => pathToBatchId(f.getPath))
+      .reverse
+  }
+
   /**
    * Removes all the log entry earlier than thresholdBatchId (exclusive).
    */