diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 12f2fe031cb1d4b16eca911801897eb739c94642..2301caafb07ff70b7f146d40d06a4ee689f65cab 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.Map
 import scala.reflect.ClassTag
 
 import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.Utils
 
 /**
  * A data type that can be accumulated, ie has an commutative and associative "add" operation,
@@ -126,7 +127,7 @@ class Accumulable[R, T] (
   }
 
   // Called by Java when deserializing an object
-  private def readObject(in: ObjectInputStream) {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     value_ = zero
     deserialized = true
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 37053bb6f37adfcb52c1b39e116d44ac65693fb3..e53a78ead2c0ed811fe5a6899d7156c67fe586be 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -204,7 +204,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   }
 
   @throws(classOf[IOException])
-  private def writeObject(out: ObjectOutputStream) {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
     val sfactory = SparkEnv.get.serializer
     sfactory match {
       case js: JavaSerializer => out.defaultWriteObject()
@@ -222,7 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   }
 
   @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream) {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     val sfactory = SparkEnv.get.serializer
     sfactory match {
       case js: JavaSerializer => in.defaultReadObject()
diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
index e50b9ac2291f94dc2c4247e3097cac313ce61b3a..55cb25946c2addb35c81e286ee8713fe86590034 100644
--- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala
+++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
@@ -24,18 +24,19 @@ import org.apache.hadoop.io.ObjectWritable
 import org.apache.hadoop.io.Writable
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.Utils
 
 @DeveloperApi
 class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
   def value = t
   override def toString = t.toString
 
-  private def writeObject(out: ObjectOutputStream) {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
     out.defaultWriteObject()
     new ObjectWritable(t).write(out)
   }
 
-  private def readObject(in: ObjectInputStream) {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     val ow = new ObjectWritable()
     ow.setConf(new Configuration())
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 4cd4f4f96fd16668ae4581ecf8a8c294494820d5..7dade04273b084fe131332788fe813ea8085b3d6 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -72,13 +72,13 @@ private[spark] class HttpBroadcast[T: ClassTag](
   }
 
   /** Used by the JVM when serializing this object. */
-  private def writeObject(out: ObjectOutputStream) {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
     assertValid()
     out.defaultWriteObject()
   }
 
   /** Used by the JVM when deserializing this object. */
-  private def readObject(in: ObjectInputStream) {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     HttpBroadcast.synchronized {
       SparkEnv.get.blockManager.getSingle(blockId) match {
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 99af2e9608ea764b974783d14d247e2501285be2..75e64c1bf401eb0c2934b7ceb06f6a38e0f41af2 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
-import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.{ByteBufferInputStream, Utils}
 import org.apache.spark.util.io.ByteArrayChunkOutputStream
 
 /**
@@ -152,13 +152,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
   }
 
   /** Used by the JVM when serializing this object. */
-  private def writeObject(out: ObjectOutputStream) {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
     assertValid()
     out.defaultWriteObject()
   }
 
   /** Used by the JVM when deserializing this object. */
-  private def readObject(in: ObjectInputStream) {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     TorrentBroadcast.synchronized {
       setConf(SparkEnv.get.conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index c3ca43f8d07343ef20474777d08102f758cd14bf..6ba395be1cc2c130b24e0a8b8208f234e05a1218 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
 import akka.actor.ActorRef
 
 import org.apache.spark.deploy.ApplicationDescription
+import org.apache.spark.util.Utils
 
 private[spark] class ApplicationInfo(
     val startTime: Long,
@@ -46,7 +47,7 @@ private[spark] class ApplicationInfo(
 
   init()
 
-  private def readObject(in: java.io.ObjectInputStream): Unit = {
+  private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     init()
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
index 80b570a44af18a5371ad1a1e0e685c9209aa81b6..2ac21186881fa3a516e44fb32a6e80ee50bdac22 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
 import java.util.Date
 
 import org.apache.spark.deploy.DriverDescription
+import org.apache.spark.util.Utils
 
 private[spark] class DriverInfo(
     val startTime: Long,
@@ -36,7 +37,7 @@ private[spark] class DriverInfo(
 
   init()
 
-  private def readObject(in: java.io.ObjectInputStream): Unit = {
+  private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     init()
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index c5fa9cf7d7c2d549aa4dbc421942af1f493020bb..d221b0f6cc86b81b01ffab7970608d54d553a388 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -50,7 +50,7 @@ private[spark] class WorkerInfo(
   def coresFree: Int = cores - coresUsed
   def memoryFree: Int = memory - memoryUsed
 
-  private def readObject(in: java.io.ObjectInputStream) : Unit = {
+  private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     init()
   }
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index 4908711d17db768d8663190330755537c348fedc..1cbd684224b7c98cbbd4e2b398dba660607c1d4d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
 import scala.reflect.ClassTag
 
 import org.apache.spark._
+import org.apache.spark.util.Utils
 
 private[spark]
 class CartesianPartition(
@@ -36,7 +37,7 @@ class CartesianPartition(
   override val index: Int = idx
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     // Update the reference to parent split at the time of task serialization
     s1 = rdd1.partitions(s1Index)
     s2 = rdd2.partitions(s2Index)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index fabb882cdd4b3211a4eea41b50ad3cab79bbbd2f..ffc0a8a6d67ebfd211310b8783a3ca0f26c40b9a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -27,6 +27,7 @@ import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv
 import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
+import org.apache.spark.util.Utils
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.ShuffleHandle
 
@@ -39,7 +40,7 @@ private[spark] case class NarrowCoGroupSplitDep(
   ) extends CoGroupSplitDep {
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     // Update the reference to parent split at the time of task serialization
     split = rdd.partitions(splitIndex)
     oos.defaultWriteObject()
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 11ebafbf6d457641e25c49cae97d43a481da1078..9fab1d78abb0406611d81ead684ffd8716eb8d17 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -25,6 +25,7 @@ import scala.language.existentials
 import scala.reflect.ClassTag
 
 import org.apache.spark._
+import org.apache.spark.util.Utils
 
 /**
  * Class that captures a coalesced RDD by essentially keeping track of parent partitions
@@ -42,7 +43,7 @@ private[spark] case class CoalescedRDDPartition(
   var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     // Update the reference to parent partition at the time of task serialization
     parents = parentsIndices.map(rdd.partitions(_))
     oos.defaultWriteObject()
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 66c71bf7e8bb5196116e5a2d38fffc06092fc125..87b22de6ae697224347df7533f187ce9ccafeddf 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -48,7 +48,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
   override def index: Int = slice
 
   @throws(classOf[IOException])
-  private def writeObject(out: ObjectOutputStream): Unit = {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
 
     val sfactory = SparkEnv.get.serializer
 
@@ -67,7 +67,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
   }
 
   @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
 
     val sfactory = SparkEnv.get.serializer
     sfactory match {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 0c2cd7a24783b843fdcbcf6dbdcbb5c58fdc9801..92b0641d0fb6e8912fa4e8a4d21e6ceda540e048 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
 import scala.reflect.ClassTag
 
 import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.util.Utils
 
 /**
  * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of
@@ -38,7 +39,7 @@ class PartitionerAwareUnionRDDPartition(
   override def hashCode(): Int = idx
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     // Update the reference to parent partition at the time of task serialization
     parents = rdds.map(_.partitions(index)).toArray
     oos.defaultWriteObject()
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 0c97eb0aaa51fbc0f41cb3316a963903b8cc0fe5..aece683ff31996964a416b8c752a431f2fba8120 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.Utils
 
 /**
  * Partition for UnionRDD.
@@ -48,7 +49,7 @@ private[spark] class UnionPartition[T: ClassTag](
   override val index: Int = idx
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     // Update the reference to parent split at the time of task serialization
     parentPartition = rdd.partitions(parentRddPartitionIndex)
     oos.defaultWriteObject()
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index f3d30f6c9b32f998b9dc1fcacadb65c29d38092d..996f2cd3f34a3343029ca41d74dd1238fed50013 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
 import scala.reflect.ClassTag
 
 import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.util.Utils
 
 private[spark] class ZippedPartitionsPartition(
     idx: Int,
@@ -34,7 +35,7 @@ private[spark] class ZippedPartitionsPartition(
   def partitions = partitionValues
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     // Update the reference to parent split at the time of task serialization
     partitionValues = rdds.map(rdd => rdd.partitions(idx))
     oos.defaultWriteObject()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 2ab5d9637b593217a8174d2152408e4d27d7776a..01d5943d777f36c067aed64b39491abbc772b11f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -22,6 +22,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
 import org.roaringbitmap.RoaringBitmap
 
 import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.Utils
 
 /**
  * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
@@ -105,13 +106,13 @@ private[spark] class CompressedMapStatus(
     MapStatus.decompressSize(compressedSizes(reduceId))
   }
 
-  override def writeExternal(out: ObjectOutput): Unit = {
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     loc.writeExternal(out)
     out.writeInt(compressedSizes.length)
     out.write(compressedSizes)
   }
 
-  override def readExternal(in: ObjectInput): Unit = {
+  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     loc = BlockManagerId(in)
     val len = in.readInt()
     compressedSizes = new Array[Byte](len)
@@ -152,13 +153,13 @@ private[spark] class HighlyCompressedMapStatus private (
     }
   }
 
-  override def writeExternal(out: ObjectOutput): Unit = {
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     loc.writeExternal(out)
     emptyBlocks.writeExternal(out)
     out.writeLong(avgSize)
   }
 
-  override def readExternal(in: ObjectInput): Unit = {
+  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     loc = BlockManagerId(in)
     emptyBlocks = new RoaringBitmap()
     emptyBlocks.readExternal(in)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index d49d8fb88700782017d724f79691d3ce926e03b9..11c19eeb6e42c4568723bfe8478e689a6647ccaa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -42,7 +42,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
 
   def this() = this(null.asInstanceOf[ByteBuffer], null, null)
 
-  override def writeExternal(out: ObjectOutput) {
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
 
     out.writeInt(valueBytes.remaining);
     Utils.writeByteBuffer(valueBytes, out)
@@ -55,7 +55,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
     out.writeObject(metrics)
   }
 
-  override def readExternal(in: ObjectInput) {
+  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
 
     val blen = in.readInt()
     val byteVal = new Array[Byte](blen)
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 554a33ce7f1a660a8d3f63e4b12281abf8ab539f..662a7b91248aa8c82720a31d436947e8320484fb 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -117,11 +117,11 @@ class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
     new JavaSerializerInstance(counterReset, classLoader)
   }
 
-  override def writeExternal(out: ObjectOutput) {
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     out.writeInt(counterReset)
   }
 
-  override def readExternal(in: ObjectInput) {
+  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     counterReset = in.readInt()
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index 142285094342c006276ea8d01983ab708ba99fb9..259f423c73e6b2732de42ffdaacdfd2af850cc60 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -61,13 +61,13 @@ class BlockManagerId private (
 
   def isDriver: Boolean = (executorId == "<driver>")
 
-  override def writeExternal(out: ObjectOutput) {
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     out.writeUTF(executorId_)
     out.writeUTF(host_)
     out.writeInt(port_)
   }
 
-  override def readExternal(in: ObjectInput) {
+  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     executorId_ = in.readUTF()
     host_ = in.readUTF()
     port_ = in.readInt()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 3db5dd9774ae857e8819fe7957b468473e2365d5..291ddfcc113ac0dbe2a9b9e5815271d6b0b1fde3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -21,6 +21,8 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
 import akka.actor.ActorRef
 
+import org.apache.spark.util.Utils
+
 private[spark] object BlockManagerMessages {
   //////////////////////////////////////////////////////////////////////////////////
   // Messages from the master to slaves.
@@ -65,7 +67,7 @@ private[spark] object BlockManagerMessages {
 
     def this() = this(null, null, null, 0, 0, 0)  // For deserialization only
 
-    override def writeExternal(out: ObjectOutput) {
+    override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
       blockManagerId.writeExternal(out)
       out.writeUTF(blockId.name)
       storageLevel.writeExternal(out)
@@ -74,7 +76,7 @@ private[spark] object BlockManagerMessages {
       out.writeLong(tachyonSize)
     }
 
-    override def readExternal(in: ObjectInput) {
+    override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
       blockManagerId = BlockManagerId(in)
       blockId = BlockId(in.readUTF())
       storageLevel = StorageLevel(in)
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 1e35abaab53534318b3911dc0eace91043af3916..56edc4fe2e4adea35dc1d78e0f0ef9505970a380 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
 import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.Utils
 
 /**
  * :: DeveloperApi ::
@@ -97,12 +98,12 @@ class StorageLevel private(
     ret
   }
 
-  override def writeExternal(out: ObjectOutput) {
+  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     out.writeByte(toInt)
     out.writeByte(_replication)
   }
 
-  override def readExternal(in: ObjectInput) {
+  override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     val flags = in.readByte()
     _useDisk = (flags & 8) != 0
     _useMemory = (flags & 4) != 0
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
index 2b452ad33b021c5818ba041152572ae55a1f3ca9..770ff9d5ad6ae863214911e964c498fbf3b62ac1 100644
--- a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
@@ -29,7 +29,7 @@ private[spark]
 class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
   def value = buffer
 
-  private def readObject(in: ObjectInputStream) {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     val length = in.readInt()
     buffer = ByteBuffer.allocate(length)
     var amountRead = 0
@@ -44,7 +44,7 @@ class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable
     buffer.rewind() // Allow us to read it later
   }
 
-  private def writeObject(out: ObjectOutputStream) {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
     out.writeInt(buffer.limit())
     if (Channels.newChannel(out).write(buffer) != buffer.limit()) {
       throw new IOException("Could not fully write buffer to output stream")
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 65bdbaae65463c607cf78fdf310a39ec98d5bdd9..e1dc49238733c2a58c5afe9f26217f2fcbdbfb2d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -969,6 +969,21 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  /**
+   * Execute a block of code that evaluates to Unit, re-throwing any non-fatal uncaught
+   * exceptions as IOException.  This is used when implementing Externalizable and Serializable's
+   * read and write methods, since Java's serializer will not report non-IOExceptions properly;
+   * see SPARK-4080 for more context.
+   */
+  def tryOrIOException(block: => Unit) {
+    try {
+      block
+    } catch {
+      case e: IOException => throw e
+      case NonFatal(t) => throw new IOException(t)
+    }
+  }
+
   /** Default filtering function for finding call sites using `getCallSite`. */
   private def coreExclusionFunction(className: String): Boolean = {
     // A regular expression to match classes of the "core" Spark API that we want to skip when
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 4b2ea45fb81d0d7571396c8ee466d014e7358b2e..2de2a7926bfd1ae03404411e0e631ed14e422dff 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -66,7 +66,7 @@ class SparkFlumeEvent() extends Externalizable {
   var event : AvroFlumeEvent = new AvroFlumeEvent()
 
   /* De-serialize from bytes. */
-  def readExternal(in: ObjectInput) {
+  def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
     val bodyLength = in.readInt()
     val bodyBuff = new Array[Byte](bodyLength)
     in.readFully(bodyBuff)
@@ -93,7 +93,7 @@ class SparkFlumeEvent() extends Externalizable {
   }
 
   /* Serialize to bytes. */
-  def writeExternal(out: ObjectOutput) {
+  def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
     val body = event.getBody.array()
     out.writeInt(body.length)
     out.write(body)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index b4adf0e9651a82aad844fed8c927355ab788cfdf..e59c24adb84af8a8c183ef93a6bd56c5430fc51d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -22,6 +22,7 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
 import org.apache.spark.Logging
 import org.apache.spark.streaming.scheduler.Job
 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream, InputDStream}
+import org.apache.spark.util.Utils
 
 final private[streaming] class DStreamGraph extends Serializable with Logging {
 
@@ -160,7 +161,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
   }
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     logDebug("DStreamGraph.writeObject used")
     this.synchronized {
       checkpointInProgress = true
@@ -172,7 +173,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
   }
 
   @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     logDebug("DStreamGraph.readObject used")
     this.synchronized {
       checkpointInProgress = true
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 213dff6a76354964fdeea7d2154686bd761cdc87..7053f47ec69a220d8b62f0446e49718bc977227b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -33,6 +33,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Interval, Duration, Time}
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.api.java._
+import org.apache.spark.util.Utils
 
 
 /**
@@ -73,13 +74,13 @@ private[python] class TransformFunction(@transient var pfunc: PythonTransformFun
     pfunc.call(time.milliseconds, rdds)
   }
 
-  private def writeObject(out: ObjectOutputStream): Unit = {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
     val bytes = PythonTransformFunctionSerializer.serialize(pfunc)
     out.writeInt(bytes.length)
     out.write(bytes)
   }
 
-  private def readObject(in: ObjectInputStream): Unit = {
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     val length = in.readInt()
     val bytes = new Array[Byte](length)
     in.readFully(bytes)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 65f7ccd3186847201f68d71f56b80b3ed17013a6..eabd61d713e0ce99e619722ec958ca4de7ce5b71 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.util.{CallSite, MetadataCleaner}
+import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
 
 /**
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -400,7 +400,7 @@ abstract class DStream[T: ClassTag] (
   }
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     logDebug(this.getClass().getSimpleName + ".writeObject used")
     if (graph != null) {
       graph.synchronized {
@@ -423,7 +423,7 @@ abstract class DStream[T: ClassTag] (
   }
 
   @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     logDebug(this.getClass().getSimpleName + ".readObject used")
     ois.defaultReadObject()
     generatedRDDs = new HashMap[Time, RDD[T]] ()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index f33c0ceafdf4209bcd19ec5b3444b7750d07316c..0dc72790fbdbd7a29a6c7199b396f7f039e0b64a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.fs.FileSystem
 import org.apache.spark.Logging
 import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
 
 private[streaming]
 class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
@@ -119,7 +120,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
   }
 
   @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
+  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
     logDebug(this.getClass().getSimpleName + ".writeObject used")
     if (dstream.context.graph != null) {
       dstream.context.graph.synchronized {
@@ -142,7 +143,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
   }
 
   @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     logDebug(this.getClass().getSimpleName + ".readObject used")
     ois.defaultReadObject()
     timeToOldestCheckpointFileTime = new HashMap[Time, Time]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 9eecbfaef363fac773a65c2be3bd600dcb5a8326..8152b7542ac57397066eade2161f3fc4f485f4e0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.UnionRDD
 import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.util.TimeStampedHashMap
+import org.apache.spark.util.{TimeStampedHashMap, Utils}
 
 
 private[streaming]
@@ -151,7 +151,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
   }
 
   @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     logDebug(this.getClass().getSimpleName + ".readObject used")
     ois.defaultReadObject()
     generatedRDDs = new HashMap[Time, RDD[(K,V)]] ()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 9327ff4822699ec980cd0d226289b6a0bf85d496..2154c24abda3a53d676adbd382c532cdee97fd8c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -73,7 +73,7 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     ois.defaultReadObject()
     output.clear()
   }
@@ -95,7 +95,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream) {
+  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
     ois.defaultReadObject()
     output.clear()
   }