Skip to content
Snippets Groups Projects
Commit 1237aaea authored by guifeng's avatar guifeng Committed by Shixiong Zhu
Browse files

[SPARK-19779][SS] Delete needless tmp file after restart structured streaming job

## What changes were proposed in this pull request?

[SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779)

The PR (https://github.com/apache/spark/pull/17012

) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future.

## How was this patch tested?
 unit tests

Author: guifeng <guifengleaf@gmail.com>

Closes #17124 from gf53520/SPARK-19779.

(cherry picked from commit e24f21b5)
Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
parent 3a7591ad
No related branches found
No related tags found
No related merge requests found
......@@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider(
// semantically correct because Structured Streaming requires rerunning a batch should
// generate the same output. (SPARK-19677)
// scalastyle:on
if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) {
if (fs.exists(finalDeltaFile)) {
fs.delete(tempDeltaFile, true)
} else if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
}
loadedMaps.put(newVersion, map)
......
......@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state
import java.io.{File, IOException}
import java.net.URI
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Random
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
......@@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
val provider = newStoreProvider(hadoopConf = conf)
provider.getStore(0).commit()
provider.getStore(0).commit()
// Verify we don't leak temp files
val tempFiles = FileUtils.listFiles(new File(provider.id.checkpointLocation),
null, true).asScala.filter(_.getName.startsWith("temp-"))
assert(tempFiles.isEmpty)
}
test("corrupted file handling") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment