diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 2d29940eb8dab4a5daa894f17f1cdcd9a375c3ae..ab1204a750fac138c9228a8c1f6f9ab0d64a8f31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider(
       // semantically correct because Structured Streaming requires rerunning a batch should
       // generate the same output. (SPARK-19677)
       // scalastyle:on
-      if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) {
+      if (fs.exists(finalDeltaFile)) {
+        fs.delete(tempDeltaFile, true)
+      } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
         throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
       }
       loadedMaps.put(newVersion, map)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 21a0a10e6deab58ace2d0aa3aff265ff8209643b..255378cb0ea81dcbabaeef9b80bd3c018e8285c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io.{File, IOException}
 import java.net.URI
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.Random
 
+import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
@@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
     val provider = newStoreProvider(hadoopConf = conf)
     provider.getStore(0).commit()
     provider.getStore(0).commit()
+
+    // Verify we don't leak temp files
+    val tempFiles = FileUtils.listFiles(new File(provider.id.checkpointLocation),
+      null, true).asScala.filter(_.getName.startsWith("temp-"))
+    assert(tempFiles.isEmpty)
   }
 
   test("corrupted file handling") {