diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 28703ef8129b3863c79148748dafb2ddfd2915dd..0a504851185884222a6751729a99cfb9aaae9112 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SparkException, SparkConf, Logging}
 import org.apache.spark.io.CompressionCodec
-import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.util.{MetadataCleaner, Utils}
 import org.apache.spark.streaming.scheduler.JobGenerator
 
 
@@ -139,8 +139,11 @@ class CheckpointWriter(
           // Write checkpoint to temp file
           fs.delete(tempFile, true)   // just in case it exists
           val fos = fs.create(tempFile)
-          fos.write(bytes)
-          fos.close()
+          Utils.tryWithSafeFinally {
+            fos.write(bytes)
+          } {
+            fos.close()
+          }
 
           // If the checkpoint file exists, back it up
           // If the backup exists as well, just delete it, otherwise rename will fail
@@ -187,9 +190,11 @@ class CheckpointWriter(
     val bos = new ByteArrayOutputStream()
     val zos = compressionCodec.compressedOutputStream(bos)
     val oos = new ObjectOutputStream(zos)
-    oos.writeObject(checkpoint)
-    oos.close()
-    bos.close()
+    Utils.tryWithSafeFinally {
+      oos.writeObject(checkpoint)
+    } {
+      oos.close()
+    }
     try {
       executor.execute(new CheckpointWriteHandler(
         checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater))
@@ -248,18 +253,24 @@ object CheckpointReader extends Logging {
     checkpointFiles.foreach(file => {
       logInfo("Attempting to load checkpoint from file " + file)
       try {
-        val fis = fs.open(file)
-        // ObjectInputStream uses the last defined user-defined class loader in the stack
-        // to find classes, which maybe the wrong class loader. Hence, a inherited version
-        // of ObjectInputStream is used to explicitly use the current thread's default class
-        // loader to find and load classes. This is a well know Java issue and has popped up
-        // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
-        val zis = compressionCodec.compressedInputStream(fis)
-        val ois = new ObjectInputStreamWithLoader(zis,
-          Thread.currentThread().getContextClassLoader)
-        val cp = ois.readObject.asInstanceOf[Checkpoint]
-        ois.close()
-        fs.close()
+        var ois: ObjectInputStreamWithLoader = null
+        var cp: Checkpoint = null
+        Utils.tryWithSafeFinally {
+          val fis = fs.open(file)
+          // ObjectInputStream uses the last defined user-defined class loader in the stack
+          // to find classes, which maybe the wrong class loader. Hence, a inherited version
+          // of ObjectInputStream is used to explicitly use the current thread's default class
+          // loader to find and load classes. This is a well know Java issue and has popped up
+          // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
+          val zis = compressionCodec.compressedInputStream(fis)
+          ois = new ObjectInputStreamWithLoader(zis,
+            Thread.currentThread().getContextClassLoader)
+          cp = ois.readObject.asInstanceOf[Checkpoint]
+        } {
+          if (ois != null) {
+            ois.close()
+          }
+        }
         cp.validate()
         logInfo("Checkpoint successfully loaded from file " + file)
         logInfo("Checkpoint was generated at time " + cp.checkpointTime)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index a7850812bd612874a4137278e76a6b9edb16f039..ca2f319f174a2f9807db92cdaf4202fbc4d5fa76 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -72,7 +72,8 @@ object RawTextSender extends Logging {
       } catch {
         case e: IOException =>
           logError("Client disconnected")
-          socket.close()
+      } finally {
+        socket.close()
       }
     }
   }