diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 54e08d7866f758438c5b8790fb1b3caf00fc50ed..e2d2250982daa113145ad8e0a2099b3552be4067 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
 
 /**
  * Classes that represent cleaning tasks.
@@ -110,7 +111,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
   }
 
   /** Keep cleaning RDD, shuffle, and broadcast state. */
-  private def keepCleaning() {
+  private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
     while (!stopped) {
       try {
         val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
@@ -128,7 +129,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
           }
         }
       } catch {
-        case t: Throwable => logError("Error in cleaning thread", t)
+        case e: Exception => logError("Error in cleaning thread", e)
       }
     }
   }
@@ -141,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
       listeners.foreach(_.rddCleaned(rddId))
       logInfo("Cleaned RDD " + rddId)
     } catch {
-      case t: Throwable => logError("Error cleaning RDD " + rddId, t)
+      case e: Exception => logError("Error cleaning RDD " + rddId, e)
     }
   }
 
@@ -154,7 +155,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
       listeners.foreach(_.shuffleCleaned(shuffleId))
       logInfo("Cleaned shuffle " + shuffleId)
     } catch {
-      case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t)
+      case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
     }
   }
 
@@ -166,7 +167,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
       listeners.foreach(_.broadcastCleaned(broadcastId))
       logInfo("Cleaned broadcast " + broadcastId)
     } catch {
-      case t: Throwable => logError("Error cleaning broadcast " + broadcastId, t)
+      case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 71bab295442fc14d15512977055ba531e1e62b31..e6121a705497cfef37a4a13d3936905e76758f0d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1494,8 +1494,8 @@ object SparkContext extends Logging {
         } catch {
           // TODO: Enumerate the exact reasons why it can fail
           // But irrespective of it, it means we cannot proceed !
-          case th: Throwable => {
-            throw new SparkException("YARN mode not available ?", th)
+          case e: Exception => {
+            throw new SparkException("YARN mode not available ?", e)
           }
         }
         val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
@@ -1510,8 +1510,8 @@ object SparkContext extends Logging {
           cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
 
         } catch {
-          case th: Throwable => {
-            throw new SparkException("YARN mode not available ?", th)
+          case e: Exception => {
+            throw new SparkException("YARN mode not available ?", e)
           }
         }
 
@@ -1521,8 +1521,8 @@ object SparkContext extends Logging {
           val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
           cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
         } catch {
-          case th: Throwable => {
-            throw new SparkException("YARN mode not available ?", th)
+          case e: Exception => {
+            throw new SparkException("YARN mode not available ?", e)
           }
         }
 
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2971c277aa863732d38e7710943c24610aa1b736..57b28b9972366d47dee404539e46a5412cf42822 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -171,7 +171,7 @@ private[spark] class PythonRDD[T: ClassTag](
       this.interrupt()
     }
 
-    override def run() {
+    override def run(): Unit = Utils.logUncaughtExceptions {
       try {
         SparkEnv.set(env)
         val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
@@ -282,7 +282,6 @@ private[spark] object PythonRDD {
       }
     } catch {
       case eof: EOFException => {}
-      case e: Throwable => throw e
     }
     JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
   }
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 002f2acd94dee3fa4d681cd71fc52c7eddebde1d..759cbe2c46c5245fb2457d145004644f1c89b5b3 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -71,7 +71,6 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
           stopDaemon()
           startDaemon()
           new Socket(daemonHost, daemonPort)
-        case e: Throwable => throw e
       }
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 7ead1171525d282bd1fb3ff474a73e450facc482..aeb159adc31d9e73eb554a8db0eca150b9d2b1a0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -157,7 +157,7 @@ object Client {
     // TODO: See if we can initialize akka so return messages are sent back using the same TCP
     //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
     val (actorSystem, _) = AkkaUtils.createActorSystem(
-      "driverClient", Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
+      "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
 
     actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index e2df1b8954124bec328225bbae982c0f921861b7..148115d3ed351006ea2675678f5580324ceb1194 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -103,7 +103,7 @@ object SparkHadoopUtil {
           .newInstance()
           .asInstanceOf[SparkHadoopUtil]
       } catch {
-       case th: Throwable => throw new SparkException("Unable to load YARN support", th)
+       case e: Exception => throw new SparkException("Unable to load YARN support", e)
       }
     } else {
       new SparkHadoopUtil
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 1238bbf9da2fd5bc69cc0d3f6c391499b1aad8c2..a9c11dca5678e8a9836231502f45a089f3e2872e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -70,7 +70,7 @@ class HistoryServer(
    * TODO: Add a mechanism to update manually.
    */
   private val logCheckingThread = new Thread {
-    override def run() {
+    override def run(): Unit = Utils.logUncaughtExceptions {
       while (!stopped) {
         val now = System.currentTimeMillis
         if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
@@ -154,7 +154,7 @@ class HistoryServer(
         numCompletedApplications = logInfos.size
 
       } catch {
-        case t: Throwable => logError("Exception in checking for event log updates", t)
+        case e: Exception => logError("Exception in checking for event log updates", e)
       }
     } else {
       logWarning("Attempted to check for event log updates before binding the server.")
@@ -231,8 +231,8 @@ class HistoryServer(
         dir.getModificationTime
       }
     } catch {
-      case t: Throwable =>
-        logError("Exception in accessing modification time of %s".format(dir.getPath), t)
+      case e: Exception =>
+        logError("Exception in accessing modification time of %s".format(dir.getPath), e)
         -1L
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index f254f5585ba254d7f25896f3a538e2d52ee14a34..c6dec305bffcb6acdd1f7086e8732a8ea5d1fc71 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -684,8 +684,8 @@ private[spark] class Master(
         webUi.attachSparkUI(ui)
         return true
       } catch {
-        case t: Throwable =>
-          logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t)
+        case e: Exception =>
+          logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e)
       }
     } else {
       logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir))
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index be15138f6240612af7725a6358c719b65d09d6a0..05e242e6df7021e0f67a727a2649e513ebe034a7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -31,7 +31,7 @@ object DriverWrapper {
       case workerUrl :: mainClass :: extraArgs =>
         val conf = new SparkConf()
         val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
-          Utils.localHostName(), 0, false, conf, new SecurityManager(conf))
+          Utils.localHostName(), 0, conf, new SecurityManager(conf))
         actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
 
         // Delegate to supplied main class
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index e912ae8a5d3c56b8dd1bf87f88c7232869c14a2b..84aec65b7765d6e52b73f62e0d37b3a1f680eb23 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -105,7 +105,7 @@ private[spark] object CoarseGrainedExecutorBackend {
         // Create a new ActorSystem to run the backend, because we can't create a
         // SparkEnv / Executor before getting started with all our system properties, etc
         val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
-          indestructible = true, conf = conf, new SecurityManager(conf))
+          conf, new SecurityManager(conf))
         // set it
         val sparkHostPort = hostname + ":" + boundPort
         actorSystem.actorOf(
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 98e7e0be813be72a99d76bd2aabc676bd80ef252..baee7a216a7c3765c6769b8c3aa48ff7e0fde29d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -74,28 +74,7 @@ private[spark] class Executor(
     // Setup an uncaught exception handler for non-local mode.
     // Make any thread terminations due to uncaught exceptions kill the entire
     // executor process to avoid surprising stalls.
-    Thread.setDefaultUncaughtExceptionHandler(
-      new Thread.UncaughtExceptionHandler {
-        override def uncaughtException(thread: Thread, exception: Throwable) {
-          try {
-            logError("Uncaught exception in thread " + thread, exception)
-
-            // We may have been called from a shutdown hook. If so, we must not call System.exit().
-            // (If we do, we will deadlock.)
-            if (!Utils.inShutdown()) {
-              if (exception.isInstanceOf[OutOfMemoryError]) {
-                System.exit(ExecutorExitCode.OOM)
-              } else {
-                System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
-              }
-            }
-          } catch {
-            case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
-            case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
-          }
-        }
-      }
-    )
+    Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler)
   }
 
   val executorSource = new ExecutorSource(this, executorId)
@@ -259,6 +238,11 @@ private[spark] class Executor(
         }
 
         case t: Throwable => {
+          // Attempt to exit cleanly by informing the driver of our failure.
+          // If anything goes wrong (or this was a fatal exception), we will delegate to
+          // the default uncaught exception handler, which will terminate the Executor.
+          logError("Exception in task ID " + taskId, t)
+
           val serviceTime = System.currentTimeMillis() - taskStart
           val metrics = attemptedTask.flatMap(t => t.metrics)
           for (m <- metrics) {
@@ -268,10 +252,11 @@ private[spark] class Executor(
           val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
           execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
 
-          // TODO: Should we exit the whole executor here? On the one hand, the failed task may
-          // have left some weird state around depending on when the exception was thrown, but on
-          // the other hand, maybe we could detect that when future tasks fail and exit then.
-          logError("Exception in task ID " + taskId, t)
+          // Don't forcibly exit unless the exception was inherently fatal, to avoid
+          // stopping other tasks unnecessarily.
+          if (Utils.isFatalError(t)) {
+            ExecutorUncaughtExceptionHandler.uncaughtException(t)
+          }
         }
       } finally {
         // TODO: Unregister shuffle memory only for ResultTask
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala
new file mode 100644
index 0000000000000000000000000000000000000000..b0e984c03964cdc09ea520b3813a2b6ac5ca2abf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * The default uncaught exception handler for Executors terminates the whole process, to avoid
+ * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
+ * to fail fast when things go wrong.
+ */
+private[spark] object ExecutorUncaughtExceptionHandler
+  extends Thread.UncaughtExceptionHandler with Logging {
+
+  override def uncaughtException(thread: Thread, exception: Throwable) {
+    try {
+      logError("Uncaught exception in thread " + thread, exception)
+
+      // We may have been called from a shutdown hook. If so, we must not call System.exit().
+      // (If we do, we will deadlock.)
+      if (!Utils.inShutdown()) {
+        if (exception.isInstanceOf[OutOfMemoryError]) {
+          System.exit(ExecutorExitCode.OOM)
+        } else {
+          System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
+        }
+      }
+    } catch {
+      case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
+      case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
+    }
+  }
+
+  def uncaughtException(exception: Throwable) {
+    uncaughtException(Thread.currentThread(), exception)
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 7968a0691db10b704df462ecc06a109aa5c17edc..a90b0d475c04ec769b3b6e17c0730cc3079ce9a2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -206,8 +206,8 @@ private[spark] object EventLoggingListener extends Logging {
         applicationComplete = filePaths.exists { path => isApplicationCompleteFile(path.getName) }
       )
     } catch {
-      case t: Throwable =>
-        logError("Exception in parsing logging info from directory %s".format(logDir), t)
+      case e: Exception =>
+        logError("Exception in parsing logging info from directory %s".format(logDir), e)
       EventLoggingInfo.empty
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index c9ad2b151daf0da147d635a7503048e5b5b78dd4..99d305b36a959c7bbb469a416d99d538bf895cac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -43,7 +43,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
   def enqueueSuccessfulTask(
     taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
     getTaskResultExecutor.execute(new Runnable {
-      override def run() {
+      override def run(): Unit = Utils.logUncaughtExceptions {
         try {
           val result = serializer.get().deserialize[TaskResult[_]](serializedData) match {
             case directResult: DirectTaskResult[_] => directResult
@@ -70,7 +70,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
           case cnf: ClassNotFoundException =>
             val loader = Thread.currentThread.getContextClassLoader
             taskSetManager.abort("ClassNotFound with classloader: " + loader)
-          case ex: Throwable =>
+          case ex: Exception =>
             taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
         }
       }
@@ -81,7 +81,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
     serializedData: ByteBuffer) {
     var reason : TaskEndReason = UnknownReason
     getTaskResultExecutor.execute(new Runnable {
-      override def run() {
+      override def run(): Unit = Utils.logUncaughtExceptions {
         try {
           if (serializedData != null && serializedData.limit() > 0) {
             reason = serializer.get().deserialize[TaskEndReason](
@@ -94,7 +94,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
             val loader = Utils.getContextOrSparkClassLoader
             logError(
               "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
-          case ex: Throwable => {}
+          case ex: Exception => {}
         }
         scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
       }
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index cf6ef0029a861e42dbf0ef4cc622234b4a10cee5..3a7243a1ba19ce6fec360e67ce6d62998c8a80fa 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -148,7 +148,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
   private def addShutdownHook() {
     localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
-      override def run() {
+      override def run(): Unit = Utils.logUncaughtExceptions {
         logDebug("Shutdown hook called")
         DiskBlockManager.this.stop()
       }
@@ -162,8 +162,8 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
         try {
           if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
         } catch {
-          case t: Throwable =>
-            logError("Exception while deleting local spark dir: " + localDir, t)
+          case e: Exception =>
+            logError("Exception while deleting local spark dir: " + localDir, e)
         }
       }
     }
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index b0b967485656860d31749b027eca10786b291a20..a6cbe3aa440ffee661221fe4a3150cc356bd53a9 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -25,7 +25,6 @@ import tachyon.client.TachyonFile
 
 import org.apache.spark.Logging
 import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.network.netty.ShuffleSender
 import org.apache.spark.util.Utils
 
 
@@ -137,7 +136,7 @@ private[spark] class TachyonBlockManager(
   private def addShutdownHook() {
     tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") {
-      override def run() {
+      override def run(): Unit = Utils.logUncaughtExceptions {
         logDebug("Shutdown hook called")
         tachyonDirs.foreach { tachyonDir =>
           try {
@@ -145,8 +144,8 @@ private[spark] class TachyonBlockManager(
               Utils.deleteRecursively(tachyonDir, client)
             }
           } catch {
-            case t: Throwable =>
-              logError("Exception while deleting tachyon spark dir: " + tachyonDir, t)
+            case e: Exception =>
+              logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
           }
         }
       }
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 8afe09a117ebcb4d1bae7d5d87a5f14ff784fb52..a8d12bb2a01650802df61a3e0601a3a72209703a 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util
 import scala.collection.JavaConversions.mapAsJavaMap
 import scala.concurrent.duration.{Duration, FiniteDuration}
 
-import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
+import akka.actor.{ActorSystem, ExtendedActorSystem}
 import com.typesafe.config.ConfigFactory
 import org.apache.log4j.{Level, Logger}
 
@@ -41,7 +41,7 @@ private[spark] object AkkaUtils extends Logging {
    * If indestructible is set to true, the Actor System will continue running in the event
    * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
    */
-  def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
+  def createActorSystem(name: String, host: String, port: Int,
     conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = {
 
     val akkaThreads   = conf.getInt("spark.akka.threads", 4)
@@ -101,12 +101,7 @@ private[spark] object AkkaUtils extends Logging {
       |akka.log-dead-letters-during-shutdown = $lifecycleEvents
       """.stripMargin))
 
-    val actorSystem = if (indestructible) {
-      IndestructibleActorSystem(name, akkaConf)
-    } else {
-      ActorSystem(name, akkaConf)
-    }
-
+    val actorSystem = ActorSystem(name, akkaConf)
     val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
     val boundPort = provider.getDefaultAddress.port.get
     (actorSystem, boundPort)
diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
deleted file mode 100644
index 4188a869c13da5aabf4649d6dbb35d66465c1233..0000000000000000000000000000000000000000
--- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Must be in akka.actor package as ActorSystemImpl is protected[akka].
-package akka.actor
-
-import scala.util.control.{ControlThrowable, NonFatal}
-
-import com.typesafe.config.Config
-
-/**
- * An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception
- * This is necessary as Spark Executors are allowed to recover from fatal exceptions
- * (see org.apache.spark.executor.Executor)
- */
-object IndestructibleActorSystem {
-  def apply(name: String, config: Config): ActorSystem =
-    apply(name, config, ActorSystem.findClassLoader())
-
-  def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
-    new IndestructibleActorSystemImpl(name, config, classLoader).start()
-}
-
-private[akka] class IndestructibleActorSystemImpl(
-    override val name: String,
-    applicationConfig: Config,
-    classLoader: ClassLoader)
-  extends ActorSystemImpl(name, applicationConfig, classLoader) {
-
-  protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
-    val fallbackHandler = super.uncaughtExceptionHandler
-
-    new Thread.UncaughtExceptionHandler() {
-      def uncaughtException(thread: Thread, cause: Throwable): Unit = {
-        if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
-          log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
-            "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
-          // shutdown()                 //TODO make it configurable
-        } else {
-          fallbackHandler.uncaughtException(thread, cause)
-        }
-      }
-    }
-  }
-
-  def isFatalError(e: Throwable): Boolean = {
-    e match {
-      case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
-        false
-      case _ =>
-        true
-    }
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 95777fbf57d8b9fa09e77ff01be32c8bd07b3494..8f7594ada2ba1cb47555623ca8ec6ea48bc074ef 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -29,6 +29,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
 import scala.reflect.ClassTag
 import scala.util.Try
+import scala.util.control.{ControlThrowable, NonFatal}
 
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -41,7 +42,6 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 
-
 /**
  * Various utility methods used by Spark.
  */
@@ -1125,4 +1125,28 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  /** 
+   * Executes the given block, printing and re-throwing any uncaught exceptions.
+   * This is particularly useful for wrapping code that runs in a thread, to ensure
+   * that exceptions are printed, and to avoid having to catch Throwable.
+   */
+  def logUncaughtExceptions[T](f: => T): T = {
+    try {
+      f
+    } catch {
+      case t: Throwable =>
+        logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
+        throw t
+    }
+  }
+
+  /** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */
+  def isFatalError(e: Throwable): Boolean = {
+    e match {
+      case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
+        false
+      case _ =>
+        true
+    }
+  }
 }