Skip to content
Snippets Groups Projects
Commit 271175e2 authored by Tathagata Das's avatar Tathagata Das Committed by Shixiong Zhu
Browse files

[SPARK-20716][SS] StateStore.abort() should not throw exceptions

## What changes were proposed in this pull request?

StateStore.abort() should do a best effort attempt to clean up temporary resources. It should not throw errors, especially because its called in a TaskCompletionListener, because this error could hide previous real errors in the task.

## How was this patch tested?
No unit test.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17958 from tdas/SPARK-20716.
parent e1aaab1e
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.streaming.state
import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException}
import java.nio.channels.ClosedChannelException
import java.util.Locale
import scala.collection.JavaConverters._
......@@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider(
/** Abort all the updates made on this store. This store will not be usable any more. */
override def abort(): Unit = {
verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed")
try {
state = ABORTED
if (tempDeltaFileStream != null) {
tempDeltaFileStream.close()
}
if (tempDeltaFile != null) {
fs.delete(tempDeltaFile, true)
}
} catch {
case c: ClosedChannelException =>
// This can happen when underlying file output stream has been closed before the
// compression stream.
logDebug(s"Error aborting version $newVersion into $this", c)
state = ABORTED
if (tempDeltaFileStream != null) {
tempDeltaFileStream.close()
}
if (tempDeltaFile != null) {
fs.delete(tempDeltaFile, true)
case e: Exception =>
logWarning(s"Error aborting version $newVersion into $this", e)
}
logInfo(s"Aborted version $newVersion for $this")
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment