diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 3cb12fca7dccb1bb306d1a4fd64786b233c6e106..eb3ff926372a2a57adf1f96944c797168711349d 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -128,16 +128,19 @@ private[spark] class DiskBlockObjectWriter( */ private def closeResources(): Unit = { if (initialized) { - mcs.manualClose() - channel = null - mcs = null - bs = null - fos = null - ts = null - objOut = null - initialized = false - streamOpen = false - hasBeenClosed = true + Utils.tryWithSafeFinally { + mcs.manualClose() + } { + channel = null + mcs = null + bs = null + fos = null + ts = null + objOut = null + initialized = false + streamOpen = false + hasBeenClosed = true + } } } @@ -199,26 +202,29 @@ private[spark] class DiskBlockObjectWriter( def revertPartialWritesAndClose(): File = { // Discard current writes. We do this by flushing the outstanding writes and then // truncating the file to its initial position. - try { + Utils.tryWithSafeFinally { if (initialized) { writeMetrics.decBytesWritten(reportedPosition - committedPosition) writeMetrics.decRecordsWritten(numRecordsWritten) streamOpen = false closeResources() } - - val truncateStream = new FileOutputStream(file, true) + } { + var truncateStream: FileOutputStream = null try { + truncateStream = new FileOutputStream(file, true) truncateStream.getChannel.truncate(committedPosition) - file + } catch { + case e: Exception => + logError("Uncaught exception while reverting partial writes to file " + file, e) } finally { - truncateStream.close() + if (truncateStream != null) { + truncateStream.close() + truncateStream = null + } } - } catch { - case e: Exception => - logError("Uncaught exception while reverting partial writes to file " + file, e) - file } + file } /**