diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 2b6f76ca28e215887e2d976d3d6a11bd3ea17a4d..127ece9ab0e56ab2ef800d3d385d18da4eafc773 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -227,6 +227,20 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
     None
   }
 
+  /**
+   * Removes all the log entry earlier than thresholdBatchId (exclusive).
+   */
+  override def purge(thresholdBatchId: Long): Unit = {
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+      .map(f => pathToBatchId(f.getPath))
+
+    for (batchId <- batchIds if batchId < thresholdBatchId) {
+      val path = batchIdToPath(batchId)
+      fileManager.delete(path)
+      logTrace(s"Removed metadata log file: $path")
+    }
+  }
+
   private def createFileManager(): FileManager = {
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     try {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
index cc70e1d314d1d9c82fab3973b51bb264d30c967d..78d6be17df05af8885f5b5b1fe057c17df15b0d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
@@ -48,4 +48,10 @@ trait MetadataLog[T] {
    * Return the latest batch Id and its metadata if exist.
    */
   def getLatest(): Option[(Long, T)]
+
+  /**
+   * Removes all the log entry earlier than thresholdBatchId (exclusive).
+   * This operation should be idempotent.
+   */
+  def purge(thresholdBatchId: Long): Unit
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index ab5a2d253b94a157aa88e5a307fa1bdeec0ee2c4..4259384f0bc619c1c69e8fa22000451573ed28ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -46,14 +46,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
   test("FileManager: FileContextManager") {
     withTempDir { temp =>
       val path = new Path(temp.getAbsolutePath)
-      testManager(path, new FileContextManager(path, new Configuration))
+      testFileManager(path, new FileContextManager(path, new Configuration))
     }
   }
 
   test("FileManager: FileSystemManager") {
     withTempDir { temp =>
       val path = new Path(temp.getAbsolutePath)
-      testManager(path, new FileSystemManager(path, new Configuration))
+      testFileManager(path, new FileSystemManager(path, new Configuration))
     }
   }
 
@@ -103,6 +103,25 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
     }
   }
 
+  testWithUninterruptibleThread("HDFSMetadataLog: purge") {
+    withTempDir { temp =>
+      val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
+      assert(metadataLog.add(0, "batch0"))
+      assert(metadataLog.add(1, "batch1"))
+      assert(metadataLog.add(2, "batch2"))
+      assert(metadataLog.get(0).isDefined)
+      assert(metadataLog.get(1).isDefined)
+      assert(metadataLog.get(2).isDefined)
+      assert(metadataLog.getLatest().get._1 == 2)
+
+      metadataLog.purge(2)
+      assert(metadataLog.get(0).isEmpty)
+      assert(metadataLog.get(1).isEmpty)
+      assert(metadataLog.get(2).isDefined)
+      assert(metadataLog.getLatest().get._1 == 2)
+    }
+  }
+
   testWithUninterruptibleThread("HDFSMetadataLog: restart") {
     withTempDir { temp =>
       val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
@@ -155,8 +174,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
     }
   }
 
-
-  def testManager(basePath: Path, fm: FileManager): Unit = {
+  /** Basic test case for [[FileManager]] implementation. */
+  private def testFileManager(basePath: Path, fm: FileManager): Unit = {
     // Mkdirs
     val dir = new Path(s"$basePath/dir/subdir/subsubdir")
     assert(!fm.exists(dir))