diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
index 9e2604c9c069f0888ce32da2596fdc00ed8fa6ba..78d6be17df05af8885f5b5b1fe057c17df15b0d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
@@ -24,7 +24,6 @@ package org.apache.spark.sql.execution.streaming
  *  - Allow the user to query the latest batch id.
  *  - Allow the user to query the metadata object of a specified batch id.
  *  - Allow the user to query metadata objects in a range of batch ids.
- *  - Allow the user to remove obsolete metadata
  */
 trait MetadataLog[T] {
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 220f77dc24ce064daaa2220812f418adf4ba25dc..a1aae61107baf53a9571ea08066c7487086ef345 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -290,13 +290,6 @@ class StreamExecution(
       assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
         s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
       logInfo(s"Committed offsets for batch $currentBatchId.")
-
-      // Now that we have logged the new batch, no further processing will happen for
-      // the previous batch, and it is safe to discard the old metadata.
-      // Note that purge is exclusive, i.e. it purges everything before currentBatchId.
-      // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in
-      // flight at the same time), this cleanup logic will need to change.
-      offsetLog.purge(currentBatchId)
     } else {
       awaitBatchLock.lock()
       try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index d3e2cab1b8bd3596d6376b096f44b5ed7618d7e1..9d58315c200312670741023d2159ca3092554760 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -125,30 +125,6 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter {
     )
   }
 
-  testQuietly("StreamExecution metadata garbage collection") {
-    val inputData = MemoryStream[Int]
-    val mapped = inputData.toDS().map(6 / _)
-
-    // Run 3 batches, and then assert that only 1 metadata file is left at the end
-    // since the first 2 should have been purged.
-    testStream(mapped)(
-      AddData(inputData, 1, 2),
-      CheckAnswer(6, 3),
-      AddData(inputData, 1, 2),
-      CheckAnswer(6, 3, 6, 3),
-      AddData(inputData, 4, 6),
-      CheckAnswer(6, 3, 6, 3, 1, 1),
-
-      AssertOnQuery("metadata log should contain only one file") { q =>
-        val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
-        val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
-        val toTest = logFileNames // Workaround for SPARK-17475
-        assert(toTest.size == 1 && toTest.head == "2")
-        true
-      }
-    )
-  }
-
   /**
    * A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`.
    *