Skip to content
Snippets Groups Projects
Commit c068d908 authored by zsxwing's avatar zsxwing Committed by Andrew Or
Browse files

SPARK-1656: Fix potential resource leaks

JIRA: https://issues.apache.org/jira/browse/SPARK-1656



Author: zsxwing <zsxwing@gmail.com>

Closes #577 from zsxwing/SPARK-1656 and squashes the following commits:

c431095 [zsxwing] Add a comment and fix the code style
2de96e5 [zsxwing] Make sure file will be deleted if exception happens
28b90dc [zsxwing] Update to follow the code style
4521d6e [zsxwing] Merge branch 'master' into SPARK-1656
afc3383 [zsxwing] Update to follow the code style
071fdd1 [zsxwing] SPARK-1656: Fix potential resource leaks

(cherry picked from commit a7c73130)
Signed-off-by: default avatarAndrew Or <andrewor14@gmail.com>
parent d9cf4d08
No related branches found
No related tags found
No related merge requests found
...@@ -163,18 +163,23 @@ private[broadcast] object HttpBroadcast extends Logging { ...@@ -163,18 +163,23 @@ private[broadcast] object HttpBroadcast extends Logging {
private def write(id: Long, value: Any) { private def write(id: Long, value: Any) {
val file = getFile(id) val file = getFile(id)
val out: OutputStream = { val fileOutputStream = new FileOutputStream(file)
if (compress) { try {
compressionCodec.compressedOutputStream(new FileOutputStream(file)) val out: OutputStream = {
} else { if (compress) {
new BufferedOutputStream(new FileOutputStream(file), bufferSize) compressionCodec.compressedOutputStream(fileOutputStream)
} else {
new BufferedOutputStream(fileOutputStream, bufferSize)
}
} }
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file
} finally {
fileOutputStream.close()
} }
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file
} }
private def read[T: ClassTag](id: Long): T = { private def read[T: ClassTag](id: Long): T = {
......
...@@ -83,15 +83,21 @@ private[spark] class FileSystemPersistenceEngine( ...@@ -83,15 +83,21 @@ private[spark] class FileSystemPersistenceEngine(
val serialized = serializer.toBinary(value) val serialized = serializer.toBinary(value)
val out = new FileOutputStream(file) val out = new FileOutputStream(file)
out.write(serialized) try {
out.close() out.write(serialized)
} finally {
out.close()
}
} }
def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = { def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int]) val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file)) val dis = new DataInputStream(new FileInputStream(file))
dis.readFully(fileData) try {
dis.close() dis.readFully(fileData)
} finally {
dis.close()
}
val clazz = m.runtimeClass.asInstanceOf[Class[T]] val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz) val serializer = serialization.serializerFor(clazz)
......
...@@ -73,7 +73,21 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc ...@@ -73,7 +73,21 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
val startTime = System.currentTimeMillis val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId) val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file) val outputStream = new FileOutputStream(file)
blockManager.dataSerializeStream(blockId, outputStream, values) try {
try {
blockManager.dataSerializeStream(blockId, outputStream, values)
} finally {
// Close outputStream here because it should be closed before file is deleted.
outputStream.close()
}
} catch {
case e: Throwable =>
if (file.exists()) {
file.delete()
}
throw e
}
val length = file.length val length = file.length
val timeTaken = System.currentTimeMillis - startTime val timeTaken = System.currentTimeMillis - startTime
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment