diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 942dc7d7eac8730b09d5ef3d744d0c9bd6f30d48..4cd4f4f96fd16668ae4581ecf8a8c294494820d5 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -163,18 +163,23 @@ private[broadcast] object HttpBroadcast extends Logging { private def write(id: Long, value: Any) { val file = getFile(id) - val out: OutputStream = { - if (compress) { - compressionCodec.compressedOutputStream(new FileOutputStream(file)) - } else { - new BufferedOutputStream(new FileOutputStream(file), bufferSize) + val fileOutputStream = new FileOutputStream(file) + try { + val out: OutputStream = { + if (compress) { + compressionCodec.compressedOutputStream(fileOutputStream) + } else { + new BufferedOutputStream(fileOutputStream, bufferSize) + } } + val ser = SparkEnv.get.serializer.newInstance() + val serOut = ser.serializeStream(out) + serOut.writeObject(value) + serOut.close() + files += file + } finally { + fileOutputStream.close() } - val ser = SparkEnv.get.serializer.newInstance() - val serOut = ser.serializeStream(out) - serOut.writeObject(value) - serOut.close() - files += file } private def read[T: ClassTag](id: Long): T = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index aa85aa060d9c101168eec1518df4a38b92cdaed5..08a99bbe6857892ab6d4ddf01ca1ff9b636d90d5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -83,15 +83,21 @@ private[spark] class FileSystemPersistenceEngine( val serialized = serializer.toBinary(value) val out = new FileOutputStream(file) - out.write(serialized) - out.close() + try { + out.write(serialized) + } finally { + out.close() + } } def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = { val fileData = new Array[Byte](file.length().asInstanceOf[Int]) val dis = new DataInputStream(new FileInputStream(file)) - dis.readFully(fileData) - dis.close() + try { + dis.readFully(fileData) + } finally { + dis.close() + } val clazz = m.runtimeClass.asInstanceOf[Class[T]] val serializer = serialization.serializerFor(clazz) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c83261dd91b36cb758ec183562b1f1ee4d6a2b59..295c70670857b2f64f3149c33d6243b9390fdfaa 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -73,7 +73,21 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val outputStream = new FileOutputStream(file) - blockManager.dataSerializeStream(blockId, outputStream, values) + try { + try { + blockManager.dataSerializeStream(blockId, outputStream, values) + } finally { + // Close outputStream here because it should be closed before file is deleted. + outputStream.close() + } + } catch { + case e: Throwable => + if (file.exists()) { + file.delete() + } + throw e + } + val length = file.length val timeTaken = System.currentTimeMillis - startTime