From 3c623d226a0c495c36c86d199879b9e922d1ece2 Mon Sep 17 00:00:00 2001 From: Tathagata Das <tathagata.das1565@gmail.com> Date: Mon, 14 Nov 2016 10:03:01 -0800 Subject: [PATCH] [SPARK-18416][STRUCTURED STREAMING] Fixed temp file leak in state store ## What changes were proposed in this pull request? StateStore.get() causes temporary files to be created immediately, even if the store is not used to make updates for new version. The temp file is not closed as store.commit() is not called in those cases, thus keeping the output stream to temp file open forever. This PR fixes it by opening the temp file only when there are updates being made. ## How was this patch tested? New unit test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15859 from tdas/SPARK-18416. (cherry picked from commit bdfe60ac921172be0fb77de2f075cc7904a3b238) Signed-off-by: Shixiong Zhu <shixiong@databricks.com> --- .../state/HDFSBackedStateStoreProvider.scala | 10 +-- .../streaming/state/StateStoreSuite.scala | 63 +++++++++++++++++++ 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 808713161c..f07feaad5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -87,8 +87,7 @@ private[state] class HDFSBackedStateStoreProvider( private val newVersion = version + 1 private val tempDeltaFile = new Path(baseDir, s"temp-${Random.nextLong}") - private val tempDeltaFileStream = compressStream(fs.create(tempDeltaFile, true)) - + private lazy val tempDeltaFileStream = compressStream(fs.create(tempDeltaFile, true)) private val allUpdates = new java.util.HashMap[UnsafeRow, StoreUpdate]() @volatile private var state: STATE = UPDATING @@ -101,7 +100,7 @@ private[state] class HDFSBackedStateStoreProvider( } override def put(key: UnsafeRow, value: UnsafeRow): Unit = { - verify(state == UPDATING, "Cannot remove after already committed or aborted") + verify(state == UPDATING, "Cannot put after already committed or aborted") val isNewKey = !mapToUpdate.containsKey(key) mapToUpdate.put(key, value) @@ -125,6 +124,7 @@ private[state] class HDFSBackedStateStoreProvider( /** Remove keys that match the following condition */ override def remove(condition: UnsafeRow => Boolean): Unit = { verify(state == UPDATING, "Cannot remove after already committed or aborted") + val keyIter = mapToUpdate.keySet().iterator() while (keyIter.hasNext) { val key = keyIter.next @@ -154,7 +154,7 @@ private[state] class HDFSBackedStateStoreProvider( finalizeDeltaFile(tempDeltaFileStream) finalDeltaFile = commitUpdates(newVersion, mapToUpdate, tempDeltaFile) state = COMMITTED - logInfo(s"Committed version $newVersion for $this") + logInfo(s"Committed version $newVersion for $this to file $finalDeltaFile") newVersion } catch { case NonFatal(e) => @@ -174,7 +174,7 @@ private[state] class HDFSBackedStateStoreProvider( if (tempDeltaFile != null) { fs.delete(tempDeltaFile, true) } - logInfo("Aborted") + logInfo(s"Aborted version $newVersion for $this") } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 504a265161..533cd0cd2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -468,6 +468,69 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth assert(e.getCause.getMessage.contains("Failed to rename")) } + test("SPARK-18416: do not create temp delta file until the store is updated") { + val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString + val storeId = StateStoreId(dir, 0, 0) + val storeConf = StateStoreConf.empty + val hadoopConf = new Configuration() + val deltaFileDir = new File(s"$dir/0/0/") + + def numTempFiles: Int = { + if (deltaFileDir.exists) { + deltaFileDir.listFiles.map(_.getName).count(n => n.contains("temp") && !n.startsWith(".")) + } else 0 + } + + def numDeltaFiles: Int = { + if (deltaFileDir.exists) { + deltaFileDir.listFiles.map(_.getName).count(n => n.contains(".delta") && !n.startsWith(".")) + } else 0 + } + + def shouldNotCreateTempFile[T](body: => T): T = { + val before = numTempFiles + val result = body + assert(numTempFiles === before) + result + } + + // Getting the store should not create temp file + val store0 = shouldNotCreateTempFile { + StateStore.get(storeId, keySchema, valueSchema, 0, storeConf, hadoopConf) + } + + // Put should create a temp file + put(store0, "a", 1) + assert(numTempFiles === 1) + assert(numDeltaFiles === 0) + + // Commit should remove temp file and create a delta file + store0.commit() + assert(numTempFiles === 0) + assert(numDeltaFiles === 1) + + // Remove should create a temp file + val store1 = shouldNotCreateTempFile { + StateStore.get(storeId, keySchema, valueSchema, 1, storeConf, hadoopConf) + } + remove(store1, _ == "a") + assert(numTempFiles === 1) + assert(numDeltaFiles === 1) + + // Commit should remove temp file and create a delta file + store1.commit() + assert(numTempFiles === 0) + assert(numDeltaFiles === 2) + + // Commit without any updates should create a delta file + val store2 = shouldNotCreateTempFile { + StateStore.get(storeId, keySchema, valueSchema, 2, storeConf, hadoopConf) + } + store2.commit() + assert(numTempFiles === 0) + assert(numDeltaFiles === 3) + } + def getDataFromFiles( provider: HDFSBackedStateStoreProvider, version: Int = -1): Set[(String, Int)] = { -- GitLab