diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0dee25fb2ebe22a488bd41a71147c40cb789bfcd..4cc580eb75ca21616e3d69c782623b1b51fdfb43 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rpc._
 import org.apache.spark.serializer.{JavaSerializer, Serializer}
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
 
 private[deploy] class Master(
     override val rpcEnv: RpcEnv,
@@ -1045,6 +1045,8 @@ private[deploy] object Master extends Logging {
   val ENDPOINT_NAME = "Master"
 
   def main(argStrings: Array[String]) {
+    Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
+      exitOnUncaughtException = false))
     Utils.initDaemon(log)
     val conf = new SparkConf
     val args = new MasterArguments(argStrings, conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index bed47455680dd2a9a22b3912dc77210889ffff9d..f6d3876e3bbfac6245f12c29dbda5df24e037d98 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -38,7 +38,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rpc._
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
 
 private[deploy] class Worker(
     override val rpcEnv: RpcEnv,
@@ -737,6 +737,8 @@ private[deploy] object Worker extends Logging {
   val ENDPOINT_NAME = "Worker"
 
   def main(argStrings: Array[String]) {
+    Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
+      exitOnUncaughtException = false))
     Utils.initDaemon(log)
     val conf = new SparkConf
     val args = new WorkerArguments(argStrings, conf)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 19e7eb086f413b6a2c474f3fbbfcc3e8ea3e7852..21f0db1039188301eee15fad2141764c1e4db8ea 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -56,7 +56,7 @@ private[spark] class Executor(
     env: SparkEnv,
     userClassPath: Seq[URL] = Nil,
     isLocal: Boolean = false,
-    uncaughtExceptionHandler: UncaughtExceptionHandler = SparkUncaughtExceptionHandler)
+    uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler)
   extends Logging {
 
   logInfo(s"Starting executor ID $executorId on host $executorHostname")
diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
index 95bf3f58bc77f5ee64cff3e9253970259ef91f65..e0f5af5250e7f83db685e27047521b986a49d693 100644
--- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
+++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
@@ -20,11 +20,12 @@ package org.apache.spark.util
 import org.apache.spark.internal.Logging
 
 /**
- * The default uncaught exception handler for Executors terminates the whole process, to avoid
- * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
- * to fail fast when things go wrong.
+ * The default uncaught exception handler for Spark daemons. It terminates the whole process for
+ * any Errors, and also terminates the process for Exceptions when the exitOnException flag is true.
+ *
+ * @param exitOnUncaughtException Whether to exit the process on UncaughtException.
  */
-private[spark] object SparkUncaughtExceptionHandler
+private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true)
   extends Thread.UncaughtExceptionHandler with Logging {
 
   override def uncaughtException(thread: Thread, exception: Throwable) {
@@ -40,7 +41,7 @@ private[spark] object SparkUncaughtExceptionHandler
       if (!ShutdownHookManager.inShutdown()) {
         if (exception.isInstanceOf[OutOfMemoryError]) {
           System.exit(SparkExitCode.OOM)
-        } else {
+        } else if (exitOnUncaughtException) {
           System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
         }
       }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b4caf68f0afaa8574cc32c681c29d4e769ff9143..584337a71cb43df981cc644e90f8acc3881a800c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -76,6 +76,8 @@ private[spark] object CallSite {
 private[spark] object Utils extends Logging {
   val random = new Random()
 
+  private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler
+
   /**
    * Define a default value for driver memory here since this value is referenced across the code
    * base and nearly all files already use Utils.scala
@@ -1274,7 +1276,7 @@ private[spark] object Utils extends Logging {
       block
     } catch {
       case e: ControlThrowable => throw e
-      case t: Throwable => SparkUncaughtExceptionHandler.uncaughtException(t)
+      case t: Throwable => sparkUncaughtExceptionHandler.uncaughtException(t)
     }
   }
 
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
index 38b082ac01197f26ecc9274a4056a69aa33c4971..aa378c9d340f1f99f0e18de772c1a19ef3773e4e 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -97,7 +97,7 @@ private[mesos] object MesosClusterDispatcher
   with CommandLineUtils {
 
   override def main(args: Array[String]) {
-    Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler)
+    Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler)
     Utils.initDaemon(log)
     val conf = new SparkConf
     val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)