diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 808713161c316f93ebbe8129d12e06aba77cf931..f07feaad5dc71031c8e02da53e8b06e2ab427da4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -87,8 +87,7 @@ private[state] class HDFSBackedStateStoreProvider( private val newVersion = version + 1 private val tempDeltaFile = new Path(baseDir, s"temp-${Random.nextLong}") - private val tempDeltaFileStream = compressStream(fs.create(tempDeltaFile, true)) - + private lazy val tempDeltaFileStream = compressStream(fs.create(tempDeltaFile, true)) private val allUpdates = new java.util.HashMap[UnsafeRow, StoreUpdate]() @volatile private var state: STATE = UPDATING @@ -101,7 +100,7 @@ private[state] class HDFSBackedStateStoreProvider( } override def put(key: UnsafeRow, value: UnsafeRow): Unit = { - verify(state == UPDATING, "Cannot remove after already committed or aborted") + verify(state == UPDATING, "Cannot put after already committed or aborted") val isNewKey = !mapToUpdate.containsKey(key) mapToUpdate.put(key, value) @@ -125,6 +124,7 @@ private[state] class HDFSBackedStateStoreProvider( /** Remove keys that match the following condition */ override def remove(condition: UnsafeRow => Boolean): Unit = { verify(state == UPDATING, "Cannot remove after already committed or aborted") + val keyIter = mapToUpdate.keySet().iterator() while (keyIter.hasNext) { val key = keyIter.next @@ -154,7 +154,7 @@ private[state] class HDFSBackedStateStoreProvider( finalizeDeltaFile(tempDeltaFileStream) finalDeltaFile = commitUpdates(newVersion, mapToUpdate, tempDeltaFile) state = COMMITTED - logInfo(s"Committed version $newVersion for $this") + logInfo(s"Committed version $newVersion for $this to file $finalDeltaFile") newVersion } catch { case NonFatal(e) => @@ -174,7 +174,7 @@ private[state] class HDFSBackedStateStoreProvider( if (tempDeltaFile != null) { fs.delete(tempDeltaFile, true) } - logInfo("Aborted") + logInfo(s"Aborted version $newVersion for $this") } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 504a26516107f673d4d3e9e3c454066d00e63efb..533cd0cd2a2ea85955e05f186772aaabcdd79d67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -468,6 +468,69 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth assert(e.getCause.getMessage.contains("Failed to rename")) } + test("SPARK-18416: do not create temp delta file until the store is updated") { + val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString + val storeId = StateStoreId(dir, 0, 0) + val storeConf = StateStoreConf.empty + val hadoopConf = new Configuration() + val deltaFileDir = new File(s"$dir/0/0/") + + def numTempFiles: Int = { + if (deltaFileDir.exists) { + deltaFileDir.listFiles.map(_.getName).count(n => n.contains("temp") && !n.startsWith(".")) + } else 0 + } + + def numDeltaFiles: Int = { + if (deltaFileDir.exists) { + deltaFileDir.listFiles.map(_.getName).count(n => n.contains(".delta") && !n.startsWith(".")) + } else 0 + } + + def shouldNotCreateTempFile[T](body: => T): T = { + val before = numTempFiles + val result = body + assert(numTempFiles === before) + result + } + + // Getting the store should not create temp file + val store0 = shouldNotCreateTempFile { + StateStore.get(storeId, keySchema, valueSchema, 0, storeConf, hadoopConf) + } + + // Put should create a temp file + put(store0, "a", 1) + assert(numTempFiles === 1) + assert(numDeltaFiles === 0) + + // Commit should remove temp file and create a delta file + store0.commit() + assert(numTempFiles === 0) + assert(numDeltaFiles === 1) + + // Remove should create a temp file + val store1 = shouldNotCreateTempFile { + StateStore.get(storeId, keySchema, valueSchema, 1, storeConf, hadoopConf) + } + remove(store1, _ == "a") + assert(numTempFiles === 1) + assert(numDeltaFiles === 1) + + // Commit should remove temp file and create a delta file + store1.commit() + assert(numTempFiles === 0) + assert(numDeltaFiles === 2) + + // Commit without any updates should create a delta file + val store2 = shouldNotCreateTempFile { + StateStore.get(storeId, keySchema, valueSchema, 2, storeConf, hadoopConf) + } + store2.commit() + assert(numTempFiles === 0) + assert(numDeltaFiles === 3) + } + def getDataFromFiles( provider: HDFSBackedStateStoreProvider, version: Int = -1): Set[(String, Int)] = {