diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 28d643abca8f42686f7174cff3aad0d6dd338285..81daacf958b5a03d3135f4587f2d6e411b0c6a3c 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -454,4 +454,25 @@ private object Utils extends Logging {
   def clone[T](value: T, serializer: SerializerInstance): T = {
     serializer.deserialize[T](serializer.serialize(value))
   }
+
+  /**
+   * Detect whether this thread might be executing a shutdown hook. Will always return true if
+   * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
+   * if System.exit was just called by a concurrent thread).
+   *
+   * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
+   * an IllegalStateException.
+   */
+  def inShutdown(): Boolean = {
+    try {
+      val hook = new Thread {
+        override def run() {}
+      }
+      Runtime.getRuntime.addShutdownHook(hook)
+      Runtime.getRuntime.removeShutdownHook(hook)
+    } catch {
+      case ise: IllegalStateException => return true
+    }
+    return false
+  }
 }
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index bd21ba719a77cf2eaca6b2f70002f191daca4653..5de09030aa1b3b4318c1eb6051eed3e1a3fb23ef 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -50,14 +50,19 @@ private[spark] class Executor extends Logging {
         override def uncaughtException(thread: Thread, exception: Throwable) {
           try {
             logError("Uncaught exception in thread " + thread, exception)
-            if (exception.isInstanceOf[OutOfMemoryError]) {
-              System.exit(ExecutorExitCode.OOM)
-            } else {
-              System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
+            
+            // We may have been called from a shutdown hook. If so, we must not call System.exit().
+            // (If we do, we will deadlock.)
+            if (!Utils.inShutdown()) {
+              if (exception.isInstanceOf[OutOfMemoryError]) {
+                System.exit(ExecutorExitCode.OOM)
+              } else {
+                System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
+              }
             }
           } catch {
-            case oom: OutOfMemoryError => System.exit(ExecutorExitCode.OOM)
-            case t: Throwable => System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
+            case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
+            case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
           }
         }
       }
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 7e5b820cbbdc6ca145c2eb7c6787bd2c137c80d0..ddbf8821ad15aa172cb248ef3aed45edb3e38f33 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -178,7 +178,11 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
       override def run() {
         logDebug("Shutdown hook called")
-        localDirs.foreach(localDir => Utils.deleteRecursively(localDir))
+        try {
+          localDirs.foreach(localDir => Utils.deleteRecursively(localDir))
+        } catch {
+          case t: Throwable => logError("Exception while deleting local spark dirs", t)
+        }
       }
     })
   }