Skip to content
Snippets Groups Projects
Commit 947c0cd9 authored by Roberto Agostino Vitillo's avatar Roberto Agostino Vitillo Committed by Shixiong Zhu
Browse files

[SPARK-19677][SS] Committing a delta file atop an existing one should not fail on HDFS

## What changes were proposed in this pull request?

HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the [implementation notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html

) of `rename()`, the behavior of the local filesystem and HDFS varies:

> Destination exists and is a file
> Renaming a file atop an existing file is specified as failing, raising an exception.
>    - Local FileSystem : the rename succeeds; the destination file is replaced by the source file.
>    - HDFS : The rename fails, no exception is raised. Instead the method call simply returns false.

This patch ensures that `rename()` isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output.

## How was this patch tested?

This patch was tested by running `StateStoreSuite`.

Author: Roberto Agostino Vitillo <ra.vitillo@gmail.com>

Closes #17012 from vitillo/fix_rename.

(cherry picked from commit 9734a928)
Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
parent 4b4c3bf3
No related branches found
No related tags found
No related merge requests found
...@@ -274,7 +274,16 @@ private[state] class HDFSBackedStateStoreProvider( ...@@ -274,7 +274,16 @@ private[state] class HDFSBackedStateStoreProvider(
private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = { private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = {
synchronized { synchronized {
val finalDeltaFile = deltaFile(newVersion) val finalDeltaFile = deltaFile(newVersion)
if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
// scalastyle:off
// Renaming a file atop an existing one fails on HDFS
// (http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html).
// Hence we should either skip the rename step or delete the target file. Because deleting the
// target file will break speculation, skipping the rename step is the only choice. It's still
// semantically correct because Structured Streaming requires rerunning a batch should
// generate the same output. (SPARK-19677)
// scalastyle:on
if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) {
throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
} }
loadedMaps.put(newVersion, map) loadedMaps.put(newVersion, map)
......
...@@ -210,13 +210,6 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth ...@@ -210,13 +210,6 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
assert(store1.commit() === 2) assert(store1.commit() === 2)
assert(rowsToSet(store1.iterator()) === Set("a" -> 1, "b" -> 1)) assert(rowsToSet(store1.iterator()) === Set("a" -> 1, "b" -> 1))
assert(getDataFromFiles(provider) === Set("a" -> 1, "b" -> 1)) assert(getDataFromFiles(provider) === Set("a" -> 1, "b" -> 1))
// Overwrite the version with other data
val store2 = provider.getStore(1)
put(store2, "c", 1)
assert(store2.commit() === 2)
assert(rowsToSet(store2.iterator()) === Set("a" -> 1, "c" -> 1))
assert(getDataFromFiles(provider) === Set("a" -> 1, "c" -> 1))
} }
test("snapshotting") { test("snapshotting") {
...@@ -292,6 +285,15 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth ...@@ -292,6 +285,15 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
assert(getDataFromFiles(provider, 19) === Set("a" -> 19)) assert(getDataFromFiles(provider, 19) === Set("a" -> 19))
} }
test("SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS") {
val conf = new Configuration()
conf.set("fs.fake.impl", classOf[RenameLikeHDFSFileSystem].getName)
conf.set("fs.default.name", "fake:///")
val provider = newStoreProvider(hadoopConf = conf)
provider.getStore(0).commit()
provider.getStore(0).commit()
}
test("corrupted file handling") { test("corrupted file handling") {
val provider = newStoreProvider(minDeltasForSnapshot = 5) val provider = newStoreProvider(minDeltasForSnapshot = 5)
...@@ -681,6 +683,21 @@ private[state] object StateStoreSuite { ...@@ -681,6 +683,21 @@ private[state] object StateStoreSuite {
} }
} }
/**
* Fake FileSystem that simulates HDFS rename semantic, i.e. renaming a file atop an existing
* one should return false.
* See hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html
*/
class RenameLikeHDFSFileSystem extends RawLocalFileSystem {
override def rename(src: Path, dst: Path): Boolean = {
if (exists(dst)) {
return false
} else {
return super.rename(src, dst)
}
}
}
/** /**
* Fake FileSystem to test that the StateStore throws an exception while committing the * Fake FileSystem to test that the StateStore throws an exception while committing the
* delta file, when `fs.rename` returns `false`. * delta file, when `fs.rename` returns `false`.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment