diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index c9426c5de23a24adbfa58572d3930a20bc543cbf..5718951451afcc6f8fa0d3000ade7b5bd9e148cd 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -356,11 +356,14 @@ private[spark] object MapOutputTracker extends Logging {
   def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
     val out = new ByteArrayOutputStream
     val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
-    // Since statuses can be modified in parallel, sync on it
-    statuses.synchronized {
-      objOut.writeObject(statuses)
+    Utils.tryWithSafeFinally {
+      // Since statuses can be modified in parallel, sync on it
+      statuses.synchronized {
+        objOut.writeObject(statuses)
+      }
+    } {
+      objOut.close()
     }
-    objOut.close()
     out.toByteArray
   }
 
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 36cf2af0857dd371ef41ee8bb344da2cf325be30..b1ffba4c546bf7c47e6e63404e5cad92b55a7d97 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -614,9 +614,9 @@ private[spark] object PythonRDD extends Logging {
         try {
           val sock = serverSocket.accept()
           val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
-          try {
+          Utils.tryWithSafeFinally {
             writeIteratorToStream(items, out)
-          } finally {
+          } {
             out.close()
           }
         } catch {
@@ -862,9 +862,9 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
     val file = File.createTempFile("broadcast", "", dir)
     path = file.getAbsolutePath
     val out = new FileOutputStream(file)
-    try {
+    Utils.tryWithSafeFinally {
       Utils.copyStream(in, out)
-    } finally {
+    } {
       out.close()
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 74ccfa6d3c9a3b72bd921be87a7926cb80a96739..4457c75e8b0fc0db01c61f0720dce9bd938b531a 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -165,7 +165,7 @@ private[broadcast] object HttpBroadcast extends Logging {
   private def write(id: Long, value: Any) {
     val file = getFile(id)
     val fileOutputStream = new FileOutputStream(file)
-    try {
+    Utils.tryWithSafeFinally {
       val out: OutputStream = {
         if (compress) {
           compressionCodec.compressedOutputStream(fileOutputStream)
@@ -175,10 +175,13 @@ private[broadcast] object HttpBroadcast extends Logging {
       }
       val ser = SparkEnv.get.serializer.newInstance()
       val serOut = ser.serializeStream(out)
-      serOut.writeObject(value)
-      serOut.close()
+      Utils.tryWithSafeFinally {
+        serOut.writeObject(value)
+      } {
+        serOut.close()
+      }
       files += file
-    } finally {
+    } {
       fileOutputStream.close()
     }
   }
@@ -212,9 +215,11 @@ private[broadcast] object HttpBroadcast extends Logging {
     }
     val ser = SparkEnv.get.serializer.newInstance()
     val serIn = ser.deserializeStream(in)
-    val obj = serIn.readObject[T]()
-    serIn.close()
-    obj
+    Utils.tryWithSafeFinally {
+      serIn.readObject[T]()
+    } {
+      serIn.close()
+    }
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 32499b3a784a1ad37d3591c2e681465f1a6eb6ba..f459ed5b3a1a114cba95905e6cacec38630a663a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
 import akka.serialization.Serialization
 
 import org.apache.spark.Logging
+import org.apache.spark.util.Utils
 
 
 /**
@@ -59,9 +60,9 @@ private[master] class FileSystemPersistenceEngine(
     val serializer = serialization.findSerializerFor(value)
     val serialized = serializer.toBinary(value)
     val out = new FileOutputStream(file)
-    try {
+    Utils.tryWithSafeFinally {
       out.write(serialized)
-    } finally {
+    } {
       out.close()
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
index 420442f7564ccf60dcc073ff8c16fd6fef68f1e6..a3539e44bd2f9db63b07df9934b6bf8dbe080d00 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException
 import com.google.common.base.Charsets
 
 import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
+import org.apache.spark.util.Utils
 
 /**
  * A client that submits applications to the standalone Master using a REST protocol.
@@ -148,8 +149,11 @@ private[deploy] class StandaloneRestClient extends Logging {
     conn.setRequestProperty("charset", "utf-8")
     conn.setDoOutput(true)
     val out = new DataOutputStream(conn.getOutputStream)
-    out.write(json.getBytes(Charsets.UTF_8))
-    out.close()
+    Utils.tryWithSafeFinally {
+      out.write(json.getBytes(Charsets.UTF_8))
+    } {
+      out.close()
+    }
     readResponse(conn)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 1c13e2c37284528dc4704cebe48109a8f4e73549..760c0fa3ac96a15cf3cf6341b0c92719ccb89709 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.util.Utils
 
 private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
 
@@ -112,8 +113,11 @@ private[spark] object CheckpointRDD extends Logging {
     }
     val serializer = env.serializer.newInstance()
     val serializeStream = serializer.serializeStream(fileOutputStream)
-    serializeStream.writeAll(iterator)
-    serializeStream.close()
+    Utils.tryWithSafeFinally {
+      serializeStream.writeAll(iterator)
+    } {
+      serializeStream.close()
+    }
 
     if (!fs.rename(tempOutputPath, finalOutputPath)) {
       if (!fs.exists(finalOutputPath)) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 6b4f097ea9ae572d78fca7d87a6e62ad6c6eeeb3..bf1303d39592d83053edb20649ed2379fe932747 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -995,7 +995,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
       require(writer != null, "Unable to obtain RecordWriter")
       var recordsWritten = 0L
-      try {
+      Utils.tryWithSafeFinally {
         while (iter.hasNext) {
           val pair = iter.next()
           writer.write(pair._1, pair._2)
@@ -1004,7 +1004,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
           maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
           recordsWritten += 1
         }
-      } finally {
+      } {
         writer.close(hadoopContext)
       }
       committer.commitTask(hadoopContext)
@@ -1068,7 +1068,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       writer.setup(context.stageId, context.partitionId, taskAttemptId)
       writer.open()
       var recordsWritten = 0L
-      try {
+
+      Utils.tryWithSafeFinally {
         while (iter.hasNext) {
           val record = iter.next()
           writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
@@ -1077,7 +1078,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
           maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
           recordsWritten += 1
         }
-      } finally {
+      } {
         writer.close()
       }
       writer.commit()
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index 50edb5a34e33392a2339a0ec03f0dad3fff29cf7..a1741e2875c16c96f51dac519d26375871e5e23f 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.storage._
+import org.apache.spark.util.Utils
 
 import IndexShuffleBlockManager.NOOP_REDUCE_ID
 
@@ -78,16 +79,15 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
   def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
     val indexFile = getIndexFile(shuffleId, mapId)
     val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
-    try {
+    Utils.tryWithSafeFinally {
       // We take in lengths of each block, need to convert it to offsets.
       var offset = 0L
       out.writeLong(offset)
-
       for (length <- lengths) {
         offset += length
         out.writeLong(offset)
       }
-    } finally {
+    } {
       out.close()
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index f703e50b6b0ace95cc1a914e62f783f549aa1962..0dfc91dfaff853da6ef5d19cc69b0169758fc2a5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -23,6 +23,7 @@ import java.nio.channels.FileChannel
 import org.apache.spark.Logging
 import org.apache.spark.serializer.{SerializationStream, Serializer}
 import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.util.Utils
 
 /**
  * An interface for writing JVM objects to some underlying storage. This interface allows
@@ -140,14 +141,17 @@ private[spark] class DiskBlockObjectWriter(
 
   override def close() {
     if (initialized) {
-      if (syncWrites) {
-        // Force outstanding writes to disk and track how long it takes
-        objOut.flush()
-        callWithTiming {
-          fos.getFD.sync()
+      Utils.tryWithSafeFinally {
+        if (syncWrites) {
+          // Force outstanding writes to disk and track how long it takes
+          objOut.flush()
+          callWithTiming {
+            fos.getFD.sync()
+          }
         }
+      } {
+        objOut.close()
       }
-      objOut.close()
 
       channel = null
       bs = null
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 61ef5ff168791ed615c67aeb88ea79c100b84548..4b232ae7d31806b8c15b8311a88a3bcabd9b8f60 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -46,10 +46,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     val startTime = System.currentTimeMillis
     val file = diskManager.getFile(blockId)
     val channel = new FileOutputStream(file).getChannel
-    while (bytes.remaining > 0) {
-      channel.write(bytes)
+    Utils.tryWithSafeFinally {
+      while (bytes.remaining > 0) {
+        channel.write(bytes)
+      }
+    } {
+      channel.close()
     }
-    channel.close()
     val finishTime = System.currentTimeMillis
     logDebug("Block %s stored as %s file on disk in %d ms".format(
       file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
@@ -75,9 +78,9 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     val file = diskManager.getFile(blockId)
     val outputStream = new FileOutputStream(file)
     try {
-      try {
+      Utils.tryWithSafeFinally {
         blockManager.dataSerializeStream(blockId, outputStream, values)
-      } finally {
+      } {
         // Close outputStream here because it should be closed before file is deleted.
         outputStream.close()
       }
@@ -106,8 +109,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
 
   private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
     val channel = new RandomAccessFile(file, "r").getChannel
-
-    try {
+    Utils.tryWithSafeFinally {
       // For small files, directly read rather than memory map
       if (length < minMemoryMapBytes) {
         val buf = ByteBuffer.allocate(length.toInt)
@@ -123,7 +125,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
       } else {
         Some(channel.map(MapMode.READ_ONLY, offset, length))
       }
-    } finally {
+    } {
       channel.close()
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index bb8bd1015668af90c6cebb43aa664140a3a93352..7c85e28679f1d70898775cdf297a69557c67cea7 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -313,7 +313,7 @@ private[spark] object Utils extends Logging {
                  transferToEnabled: Boolean = false): Long =
   {
     var count = 0L
-    try {
+    tryWithSafeFinally {
       if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
         && transferToEnabled) {
         // When both streams are File stream, use transferTo to improve copy performance.
@@ -353,7 +353,7 @@ private[spark] object Utils extends Logging {
         }
       }
       count
-    } finally {
+    } {
       if (closeStreams) {
         try {
           in.close()
@@ -1214,6 +1214,44 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  /**
+   * Execute a block of code, then a finally block, but if exceptions happen in
+   * the finally block, do not suppress the original exception.
+   *
+   * This is primarily an issue with `finally { out.close() }` blocks, where
+   * close needs to be called to clean up `out`, but if an exception happened
+   * in `out.write`, it's likely `out` may be corrupted and `out.close` will
+   * fail as well. This would then suppress the original/likely more meaningful
+   * exception from the original `out.write` call.
+   */
+  def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
+    // It would be nice to find a method on Try that did this
+    var originalThrowable: Throwable = null
+    try {
+      block
+    } catch {
+      case t: Throwable =>
+        // Purposefully not using NonFatal, because even fatal exceptions
+        // we don't want to have our finallyBlock suppress
+        originalThrowable = t
+        throw originalThrowable
+    } finally {
+      try {
+        finallyBlock
+      } catch {
+        case t: Throwable =>
+          if (originalThrowable != null) {
+            // We could do originalThrowable.addSuppressed(t), but it's
+            // not available in JDK 1.6.
+            logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
+            throw originalThrowable
+          } else {
+            throw t
+          }
+      }
+    }
+  }
+
   /** Default filtering function for finding call sites using `getCallSite`. */
   private def coreExclusionFunction(className: String): Boolean = {
     // A regular expression to match classes of the "core" Spark API that we want to skip when
@@ -2074,7 +2112,7 @@ private[spark] class RedirectThread(
   override def run() {
     scala.util.control.Exception.ignoring(classOf[IOException]) {
       // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
-      try {
+      Utils.tryWithSafeFinally {
         val buf = new Array[Byte](1024)
         var len = in.read(buf)
         while (len != -1) {
@@ -2082,7 +2120,7 @@ private[spark] class RedirectThread(
           out.flush()
           len = in.read(buf)
         }
-      } finally {
+      } {
         if (propagateEof) {
           out.close()
         }
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 7bd3c7852a6b26bcb635dbe754a3e22e2823b26f..035f3767ff5547d55b5865ce1f0c727d34feb63d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -728,25 +728,19 @@ private[spark] class ExternalSorter[K, V, C](
       // this simple we spill out the current in-memory collection so that everything is in files.
       spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
       partitionWriters.foreach(_.commitAndClose())
-      var out: FileOutputStream = null
-      var in: FileInputStream = null
+      val out = new FileOutputStream(outputFile, true)
       val writeStartTime = System.nanoTime
-      try {
-        out = new FileOutputStream(outputFile, true)
+      util.Utils.tryWithSafeFinally {
         for (i <- 0 until numPartitions) {
-          in = new FileInputStream(partitionWriters(i).fileSegment().file)
-          val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
-          in.close()
-          in = null
-          lengths(i) = size
-        }
-      } finally {
-        if (out != null) {
-          out.close()
-        }
-        if (in != null) {
-          in.close()
+          val in = new FileInputStream(partitionWriters(i).fileSegment().file)
+          util.Utils.tryWithSafeFinally {
+            lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
+          } {
+            in.close()
+          }
         }
+      } {
+        out.close()
         context.taskMetrics.shuffleWriteMetrics.foreach(
           _.incShuffleWriteTime(System.nanoTime - writeStartTime))
       }