Skip to content
Snippets Groups Projects
Commit 6f7ecb0f authored by Burak Yavuz's avatar Burak Yavuz Committed by Tathagata Das
Browse files

[SPARK-18342] Make rename failures fatal in HDFSBackedStateStore

## What changes were proposed in this pull request?

If the rename operation in the state store fails (`fs.rename` returns `false`), the StateStore should throw an exception and have the task retry. Currently if renames fail, nothing happens during execution immediately. However, you will observe that snapshot operations will fail, and then any attempt at recovery (executor failure / checkpoint recovery) also fails.

## How was this patch tested?

Unit test

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15804 from brkyvz/rename-state.
parent b6de0c98
No related branches found
No related tags found
No related merge requests found
...@@ -254,7 +254,9 @@ private[state] class HDFSBackedStateStoreProvider( ...@@ -254,7 +254,9 @@ private[state] class HDFSBackedStateStoreProvider(
private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = { private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = {
synchronized { synchronized {
val finalDeltaFile = deltaFile(newVersion) val finalDeltaFile = deltaFile(newVersion)
fs.rename(tempDeltaFile, finalDeltaFile) if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
}
loadedMaps.put(newVersion, map) loadedMaps.put(newVersion, map)
finalDeltaFile finalDeltaFile
} }
...@@ -525,7 +527,7 @@ private[state] class HDFSBackedStateStoreProvider( ...@@ -525,7 +527,7 @@ private[state] class HDFSBackedStateStoreProvider(
val deltaFiles = allFiles.filter { file => val deltaFiles = allFiles.filter { file =>
file.version > snapshotFile.version && file.version <= version file.version > snapshotFile.version && file.version <= version
} }.toList
verify( verify(
deltaFiles.size == version - snapshotFile.version, deltaFiles.size == version - snapshotFile.version,
s"Unexpected list of delta files for version $version for $this: $deltaFiles" s"Unexpected list of delta files for version $version for $this: $deltaFiles"
......
...@@ -17,13 +17,14 @@ ...@@ -17,13 +17,14 @@
package org.apache.spark.sql.execution.streaming.state package org.apache.spark.sql.execution.streaming.state
import java.io.File import java.io.{File, IOException}
import java.net.URI
import scala.collection.mutable import scala.collection.mutable
import scala.util.Random import scala.util.Random
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._ import org.scalatest.time.SpanSugar._
...@@ -455,6 +456,18 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth ...@@ -455,6 +456,18 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
} }
} }
test("SPARK-18342: commit fails when rename fails") {
import RenameReturnsFalseFileSystem._
val dir = scheme + "://" + Utils.createDirectory(tempDir, Random.nextString(5)).toString
val conf = new Configuration()
conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName)
val provider = newStoreProvider(dir = dir, hadoopConf = conf)
val store = provider.getStore(0)
put(store, "a", 0)
val e = intercept[IllegalStateException](store.commit())
assert(e.getCause.getMessage.contains("Failed to rename"))
}
def getDataFromFiles( def getDataFromFiles(
provider: HDFSBackedStateStoreProvider, provider: HDFSBackedStateStoreProvider,
version: Int = -1): Set[(String, Int)] = { version: Int = -1): Set[(String, Int)] = {
...@@ -524,9 +537,10 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth ...@@ -524,9 +537,10 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
def newStoreProvider( def newStoreProvider(
opId: Long = Random.nextLong, opId: Long = Random.nextLong,
partition: Int = 0, partition: Int = 0,
minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
dir: String = Utils.createDirectory(tempDir, Random.nextString(5)).toString,
hadoopConf: Configuration = new Configuration()
): HDFSBackedStateStoreProvider = { ): HDFSBackedStateStoreProvider = {
val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
val sqlConf = new SQLConf() val sqlConf = new SQLConf()
sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot) sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot)
new HDFSBackedStateStoreProvider( new HDFSBackedStateStoreProvider(
...@@ -534,7 +548,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth ...@@ -534,7 +548,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
keySchema, keySchema,
valueSchema, valueSchema,
new StateStoreConf(sqlConf), new StateStoreConf(sqlConf),
new Configuration()) hadoopConf)
} }
def remove(store: StateStore, condition: String => Boolean): Unit = { def remove(store: StateStore, condition: String => Boolean): Unit = {
...@@ -598,3 +612,20 @@ private[state] object StateStoreSuite { ...@@ -598,3 +612,20 @@ private[state] object StateStoreSuite {
}}.toSet }}.toSet
} }
} }
/**
* Fake FileSystem to test that the StateStore throws an exception while committing the
* delta file, when `fs.rename` returns `false`.
*/
class RenameReturnsFalseFileSystem extends RawLocalFileSystem {
import RenameReturnsFalseFileSystem._
override def getUri: URI = {
URI.create(s"$scheme:///")
}
override def rename(src: Path, dst: Path): Boolean = false
}
object RenameReturnsFalseFileSystem {
val scheme = s"StateStoreSuite${math.abs(Random.nextInt)}fs"
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment