Skip to content
Snippets Groups Projects
Commit ed5d1e72 authored by jerryshao's avatar jerryshao Committed by Marcelo Vanzin
Browse files

[SPARK-19306][CORE] Fix inconsistent state in DiskBlockObject when expection occurred


## What changes were proposed in this pull request?

In `DiskBlockObjectWriter`, when some errors happened during writing, it will call `revertPartialWritesAndClose`, if this method again failed due to some issues like out of disk, it will throw exception without resetting the state of this writer, also skipping the revert. So here propose to fix this issue to offer user a chance to recover from such issue.

## How was this patch tested?

Existing test.

Author: jerryshao <sshao@hortonworks.com>

Closes #16657 from jerryshao/SPARK-19306.

(cherry picked from commit e4974721)
Signed-off-by: default avatarMarcelo Vanzin <vanzin@cloudera.com>
parent 1e07a719
No related branches found
No related tags found
No related merge requests found
......@@ -128,16 +128,19 @@ private[spark] class DiskBlockObjectWriter(
*/
private def closeResources(): Unit = {
if (initialized) {
mcs.manualClose()
channel = null
mcs = null
bs = null
fos = null
ts = null
objOut = null
initialized = false
streamOpen = false
hasBeenClosed = true
Utils.tryWithSafeFinally {
mcs.manualClose()
} {
channel = null
mcs = null
bs = null
fos = null
ts = null
objOut = null
initialized = false
streamOpen = false
hasBeenClosed = true
}
}
}
......@@ -199,26 +202,29 @@ private[spark] class DiskBlockObjectWriter(
def revertPartialWritesAndClose(): File = {
// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
try {
Utils.tryWithSafeFinally {
if (initialized) {
writeMetrics.decBytesWritten(reportedPosition - committedPosition)
writeMetrics.decRecordsWritten(numRecordsWritten)
streamOpen = false
closeResources()
}
val truncateStream = new FileOutputStream(file, true)
} {
var truncateStream: FileOutputStream = null
try {
truncateStream = new FileOutputStream(file, true)
truncateStream.getChannel.truncate(committedPosition)
file
} catch {
case e: Exception =>
logError("Uncaught exception while reverting partial writes to file " + file, e)
} finally {
truncateStream.close()
if (truncateStream != null) {
truncateStream.close()
truncateStream = null
}
}
} catch {
case e: Exception =>
logError("Uncaught exception while reverting partial writes to file " + file, e)
file
}
file
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment