diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 207a0c1bffeb374f9bbedd46e96a707855a77da5..2e01a9a18c784242f9e2db8d631a78b52b015f49 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -563,7 +563,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     // Make sure the context is stopped if the user forgets about it. This avoids leaving
     // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
     // is killed, though.
-    _shutdownHookRef = Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
+    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
+      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
       logInfo("Invoking stop() from shutdown hook")
       stop()
     }
@@ -1671,7 +1672,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       return
     }
     if (_shutdownHookRef != null) {
-      Utils.removeShutdownHook(_shutdownHookRef)
+      ShutdownHookManager.removeShutdownHook(_shutdownHookRef)
     }
 
     Utils.tryLogNonFatalError {
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index a076a9c3f984d85a138215ba62f6aa9b3593e1a7..d4f327cc588fe34c2a43ab11803f4f34fd68baba 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -30,7 +30,7 @@ import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, Applica
   UIRoot}
 import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
 import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.{ShutdownHookManager, SignalLogger, Utils}
 
 /**
  * A web server that renders SparkUIs of completed applications.
@@ -238,7 +238,7 @@ object HistoryServer extends Logging {
     val server = new HistoryServer(conf, provider, securityManager, port)
     server.bind()
 
-    Utils.addShutdownHook { () => server.stop() }
+    ShutdownHookManager.addShutdownHook { () => server.stop() }
 
     // Wait until the end of the world... or if the HistoryServer process is manually stopped
     while(true) { Thread.sleep(Int.MaxValue) }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 29a5042285578b7cff205858a057081122881e26..ab3fea475c2a58ead624d637e74b21f2551e08f2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -28,7 +28,7 @@ import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.{SecurityManager, SparkConf, Logging}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 import org.apache.spark.util.logging.FileAppender
 
 /**
@@ -70,7 +70,8 @@ private[deploy] class ExecutorRunner(
     }
     workerThread.start()
     // Shutdown hook that kills actors on shutdown.
-    shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) }
+    shutdownHook = ShutdownHookManager.addShutdownHook { () =>
+      killProcess(Some("Worker shutting down")) }
   }
 
   /**
@@ -102,7 +103,7 @@ private[deploy] class ExecutorRunner(
       workerThread = null
       state = ExecutorState.KILLED
       try {
-        Utils.removeShutdownHook(shutdownHook)
+        ShutdownHookManager.removeShutdownHook(shutdownHook)
       } catch {
         case e: IllegalStateException => None
       }
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f1c17369cb48cc99b55e00bf407c98b9228747a0..e1f8719eead02f990c415806a518842d4351e247 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{SerializableConfiguration, NextIterator, Utils}
+import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, NextIterator, Utils}
 import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
 import org.apache.spark.storage.StorageLevel
 
@@ -274,7 +274,7 @@ class HadoopRDD[K, V](
           }
         } catch {
           case e: Exception => {
-            if (!Utils.inShutdown()) {
+            if (!ShutdownHookManager.inShutdown()) {
               logWarning("Exception in RecordReader.close()", e)
             }
           }
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index f83a051f5da11f89b071eb775994694e228cc156..6a9c004d65cff6d71120ec04093ce578a6de33c9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Utils}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.storage.StorageLevel
 
@@ -186,7 +186,7 @@ class NewHadoopRDD[K, V](
           }
         } catch {
           case e: Exception => {
-            if (!Utils.inShutdown()) {
+            if (!ShutdownHookManager.inShutdown()) {
               logWarning("Exception in RecordReader.close()", e)
             }
           }
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index 6a95e44c57feca060cf88398d2e45269d00a1152..fa3fecc80cb63b49c6085ec54904f8e8f05bf637 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -33,7 +33,7 @@ import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.{Partition => SparkPartition, _}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Utils}
 
 
 private[spark] class SqlNewHadoopPartition(
@@ -212,7 +212,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
           }
         } catch {
           case e: Exception =>
-            if (!Utils.inShutdown()) {
+            if (!ShutdownHookManager.inShutdown()) {
               logWarning("Exception in RecordReader.close()", e)
             }
         }
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 56a33d5ca7d60b174199a41e14b18080ec59d22a..3f8d26e1d4cab9525b86cf1a56d023154c44162f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -22,7 +22,7 @@ import java.io.{IOException, File}
 
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**
  * Creates and maintains the logical mapping between logical blocks and physical on-disk
@@ -144,7 +144,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
   }
 
   private def addShutdownHook(): AnyRef = {
-    Utils.addShutdownHook(Utils.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
+    ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
       logInfo("Shutdown hook called")
       DiskBlockManager.this.doStop()
     }
@@ -154,7 +154,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
   private[spark] def stop() {
     // Remove the shutdown hook.  It causes memory leaks if we leave it around.
     try {
-      Utils.removeShutdownHook(shutdownHook)
+      ShutdownHookManager.removeShutdownHook(shutdownHook)
     } catch {
       case e: Exception =>
         logError(s"Exception while removing shutdown hook.", e)
@@ -168,7 +168,9 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
       localDirs.foreach { localDir =>
         if (localDir.isDirectory() && localDir.exists()) {
           try {
-            if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
+            if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
+              Utils.deleteRecursively(localDir)
+            }
           } catch {
             case e: Exception =>
               logError(s"Exception while deleting local spark dir: $localDir", e)
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index ebad5bc5ab28d174e9f0896c2105733d099e04f7..22878783fca674e11a3b4fa7655ad63a1833f308 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -32,7 +32,7 @@ import tachyon.TachyonURI
 
 import org.apache.spark.Logging
 import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 
 /**
@@ -80,7 +80,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
     // in order to avoid having really large inodes at the top level in Tachyon.
     tachyonDirs = createTachyonDirs()
     subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
-    tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
+    tachyonDirs.foreach(tachyonDir => ShutdownHookManager.registerShutdownDeleteDir(tachyonDir))
   }
 
   override def toString: String = {"ExternalBlockStore-Tachyon"}
@@ -240,7 +240,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
     logDebug("Shutdown hook called")
     tachyonDirs.foreach { tachyonDir =>
       try {
-        if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
+        if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(tachyonDir)) {
           Utils.deleteRecursively(tachyonDir, client)
         }
       } catch {
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
new file mode 100644
index 0000000000000000000000000000000000000000..61ff9b89ec1c128e8bceb5ef45860c830409a319
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.File
+import java.util.PriorityQueue
+
+import scala.util.{Failure, Success, Try}
+import tachyon.client.TachyonFile
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.spark.Logging
+
+/**
+ * Various utility methods used by Spark.
+ */
+private[spark] object ShutdownHookManager extends Logging {
+  val DEFAULT_SHUTDOWN_PRIORITY = 100
+
+  /**
+   * The shutdown priority of the SparkContext instance. This is lower than the default
+   * priority, so that by default hooks are run before the context is shut down.
+   */
+  val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
+
+  /**
+   * The shutdown priority of temp directory must be lower than the SparkContext shutdown
+   * priority. Otherwise cleaning the temp directories while Spark jobs are running can
+   * throw undesirable errors at the time of shutdown.
+   */
+  val TEMP_DIR_SHUTDOWN_PRIORITY = 25
+
+  private lazy val shutdownHooks = {
+    val manager = new SparkShutdownHookManager()
+    manager.install()
+    manager
+  }
+
+  private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
+  private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
+
+  // Add a shutdown hook to delete the temp dirs when the JVM exits
+  addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
+    logInfo("Shutdown hook called")
+    shutdownDeletePaths.foreach { dirPath =>
+      try {
+        logInfo("Deleting directory " + dirPath)
+        Utils.deleteRecursively(new File(dirPath))
+      } catch {
+        case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
+      }
+    }
+  }
+
+  // Register the path to be deleted via shutdown hook
+  def registerShutdownDeleteDir(file: File) {
+    val absolutePath = file.getAbsolutePath()
+    shutdownDeletePaths.synchronized {
+      shutdownDeletePaths += absolutePath
+    }
+  }
+
+  // Register the tachyon path to be deleted via shutdown hook
+  def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
+    val absolutePath = tachyonfile.getPath()
+    shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths += absolutePath
+    }
+  }
+
+  // Remove the path to be deleted via shutdown hook
+  def removeShutdownDeleteDir(file: File) {
+    val absolutePath = file.getAbsolutePath()
+    shutdownDeletePaths.synchronized {
+      shutdownDeletePaths.remove(absolutePath)
+    }
+  }
+
+  // Remove the tachyon path to be deleted via shutdown hook
+  def removeShutdownDeleteDir(tachyonfile: TachyonFile) {
+    val absolutePath = tachyonfile.getPath()
+    shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths.remove(absolutePath)
+    }
+  }
+
+  // Is the path already registered to be deleted via a shutdown hook ?
+  def hasShutdownDeleteDir(file: File): Boolean = {
+    val absolutePath = file.getAbsolutePath()
+    shutdownDeletePaths.synchronized {
+      shutdownDeletePaths.contains(absolutePath)
+    }
+  }
+
+  // Is the path already registered to be deleted via a shutdown hook ?
+  def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
+    val absolutePath = file.getPath()
+    shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths.contains(absolutePath)
+    }
+  }
+
+  // Note: if file is child of some registered path, while not equal to it, then return true;
+  // else false. This is to ensure that two shutdown hooks do not try to delete each others
+  // paths - resulting in IOException and incomplete cleanup.
+  def hasRootAsShutdownDeleteDir(file: File): Boolean = {
+    val absolutePath = file.getAbsolutePath()
+    val retval = shutdownDeletePaths.synchronized {
+      shutdownDeletePaths.exists { path =>
+        !absolutePath.equals(path) && absolutePath.startsWith(path)
+      }
+    }
+    if (retval) {
+      logInfo("path = " + file + ", already present as root for deletion.")
+    }
+    retval
+  }
+
+  // Note: if file is child of some registered path, while not equal to it, then return true;
+  // else false. This is to ensure that two shutdown hooks do not try to delete each others
+  // paths - resulting in Exception and incomplete cleanup.
+  def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
+    val absolutePath = file.getPath()
+    val retval = shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths.exists { path =>
+        !absolutePath.equals(path) && absolutePath.startsWith(path)
+      }
+    }
+    if (retval) {
+      logInfo("path = " + file + ", already present as root for deletion.")
+    }
+    retval
+  }
+
+  /**
+   * Detect whether this thread might be executing a shutdown hook. Will always return true if
+   * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
+   * if System.exit was just called by a concurrent thread).
+   *
+   * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
+   * an IllegalStateException.
+   */
+  def inShutdown(): Boolean = {
+    try {
+      val hook = new Thread {
+        override def run() {}
+      }
+      Runtime.getRuntime.addShutdownHook(hook)
+      Runtime.getRuntime.removeShutdownHook(hook)
+    } catch {
+      case ise: IllegalStateException => return true
+    }
+    false
+  }
+
+  /**
+   * Adds a shutdown hook with default priority.
+   *
+   * @param hook The code to run during shutdown.
+   * @return A handle that can be used to unregister the shutdown hook.
+   */
+  def addShutdownHook(hook: () => Unit): AnyRef = {
+    addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
+  }
+
+  /**
+   * Adds a shutdown hook with the given priority. Hooks with lower priority values run
+   * first.
+   *
+   * @param hook The code to run during shutdown.
+   * @return A handle that can be used to unregister the shutdown hook.
+   */
+  def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
+    shutdownHooks.add(priority, hook)
+  }
+
+  /**
+   * Remove a previously installed shutdown hook.
+   *
+   * @param ref A handle returned by `addShutdownHook`.
+   * @return Whether the hook was removed.
+   */
+  def removeShutdownHook(ref: AnyRef): Boolean = {
+    shutdownHooks.remove(ref)
+  }
+
+}
+
+private [util] class SparkShutdownHookManager {
+
+  private val hooks = new PriorityQueue[SparkShutdownHook]()
+  private var shuttingDown = false
+
+  /**
+   * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
+   * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
+   * the best.
+   */
+  def install(): Unit = {
+    val hookTask = new Runnable() {
+      override def run(): Unit = runAll()
+    }
+    Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
+      case Success(shmClass) =>
+        val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+          .asInstanceOf[Int]
+        val shm = shmClass.getMethod("get").invoke(null)
+        shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
+          .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
+
+      case Failure(_) =>
+        Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
+    }
+  }
+
+  def runAll(): Unit = synchronized {
+    shuttingDown = true
+    while (!hooks.isEmpty()) {
+      Try(Utils.logUncaughtExceptions(hooks.poll().run()))
+    }
+  }
+
+  def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
+    checkState()
+    val hookRef = new SparkShutdownHook(priority, hook)
+    hooks.add(hookRef)
+    hookRef
+  }
+
+  def remove(ref: AnyRef): Boolean = synchronized {
+    hooks.remove(ref)
+  }
+
+  private def checkState(): Unit = {
+    if (shuttingDown) {
+      throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
+    }
+  }
+
+}
+
+private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
+  extends Comparable[SparkShutdownHook] {
+
+  override def compareTo(other: SparkShutdownHook): Int = {
+    other.priority - priority
+  }
+
+  def run(): Unit = hook()
+
+}
diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
index ad3db1fbb57ed4decca8f0da13bea7a970524075..72481872473308d6cf37f8cde63129d4697ffdf2 100644
--- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
+++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
@@ -33,7 +33,7 @@ private[spark] object SparkUncaughtExceptionHandler
 
       // We may have been called from a shutdown hook. If so, we must not call System.exit().
       // (If we do, we will deadlock.)
-      if (!Utils.inShutdown()) {
+      if (!ShutdownHookManager.inShutdown()) {
         if (exception.isInstanceOf[OutOfMemoryError]) {
           System.exit(SparkExitCode.OOM)
         } else {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a90d8541366f4cad3da6b8e36157e8f5c3ca7f5e..f2abf227dc1297ced80e16f419b1c5f9efaf2657 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.lang.management.ManagementFactory
 import java.net._
 import java.nio.ByteBuffer
-import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
+import java.util.{Properties, Locale, Random, UUID}
 import java.util.concurrent._
 import javax.net.ssl.HttpsURLConnection
 
@@ -65,21 +65,6 @@ private[spark] object CallSite {
 private[spark] object Utils extends Logging {
   val random = new Random()
 
-  val DEFAULT_SHUTDOWN_PRIORITY = 100
-
-  /**
-   * The shutdown priority of the SparkContext instance. This is lower than the default
-   * priority, so that by default hooks are run before the context is shut down.
-   */
-  val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
-
-  /**
-   * The shutdown priority of temp directory must be lower than the SparkContext shutdown
-   * priority. Otherwise cleaning the temp directories while Spark jobs are running can
-   * throw undesirable errors at the time of shutdown.
-   */
-  val TEMP_DIR_SHUTDOWN_PRIORITY = 25
-
   /**
    * Define a default value for driver memory here since this value is referenced across the code
    * base and nearly all files already use Utils.scala
@@ -90,9 +75,6 @@ private[spark] object Utils extends Logging {
   @volatile private var localRootDirs: Array[String] = null
 
 
-  private val shutdownHooks = new SparkShutdownHookManager()
-  shutdownHooks.install()
-
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()
@@ -205,86 +187,6 @@ private[spark] object Utils extends Logging {
     }
   }
 
-  private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
-  private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
-
-  // Add a shutdown hook to delete the temp dirs when the JVM exits
-  addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
-    logInfo("Shutdown hook called")
-    shutdownDeletePaths.foreach { dirPath =>
-      try {
-        logInfo("Deleting directory " + dirPath)
-        Utils.deleteRecursively(new File(dirPath))
-      } catch {
-        case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
-      }
-    }
-  }
-
-  // Register the path to be deleted via shutdown hook
-  def registerShutdownDeleteDir(file: File) {
-    val absolutePath = file.getAbsolutePath()
-    shutdownDeletePaths.synchronized {
-      shutdownDeletePaths += absolutePath
-    }
-  }
-
-  // Register the tachyon path to be deleted via shutdown hook
-  def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
-    val absolutePath = tachyonfile.getPath()
-    shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths += absolutePath
-    }
-  }
-
-  // Is the path already registered to be deleted via a shutdown hook ?
-  def hasShutdownDeleteDir(file: File): Boolean = {
-    val absolutePath = file.getAbsolutePath()
-    shutdownDeletePaths.synchronized {
-      shutdownDeletePaths.contains(absolutePath)
-    }
-  }
-
-  // Is the path already registered to be deleted via a shutdown hook ?
-  def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
-    val absolutePath = file.getPath()
-    shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths.contains(absolutePath)
-    }
-  }
-
-  // Note: if file is child of some registered path, while not equal to it, then return true;
-  // else false. This is to ensure that two shutdown hooks do not try to delete each others
-  // paths - resulting in IOException and incomplete cleanup.
-  def hasRootAsShutdownDeleteDir(file: File): Boolean = {
-    val absolutePath = file.getAbsolutePath()
-    val retval = shutdownDeletePaths.synchronized {
-      shutdownDeletePaths.exists { path =>
-        !absolutePath.equals(path) && absolutePath.startsWith(path)
-      }
-    }
-    if (retval) {
-      logInfo("path = " + file + ", already present as root for deletion.")
-    }
-    retval
-  }
-
-  // Note: if file is child of some registered path, while not equal to it, then return true;
-  // else false. This is to ensure that two shutdown hooks do not try to delete each others
-  // paths - resulting in Exception and incomplete cleanup.
-  def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
-    val absolutePath = file.getPath()
-    val retval = shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths.exists { path =>
-        !absolutePath.equals(path) && absolutePath.startsWith(path)
-      }
-    }
-    if (retval) {
-      logInfo("path = " + file + ", already present as root for deletion.")
-    }
-    retval
-  }
-
   /**
    * JDK equivalent of `chmod 700 file`.
    *
@@ -333,7 +235,7 @@ private[spark] object Utils extends Logging {
       root: String = System.getProperty("java.io.tmpdir"),
       namePrefix: String = "spark"): File = {
     val dir = createDirectory(root, namePrefix)
-    registerShutdownDeleteDir(dir)
+    ShutdownHookManager.registerShutdownDeleteDir(dir)
     dir
   }
 
@@ -973,9 +875,7 @@ private[spark] object Utils extends Logging {
           if (savedIOException != null) {
             throw savedIOException
           }
-          shutdownDeletePaths.synchronized {
-            shutdownDeletePaths.remove(file.getAbsolutePath)
-          }
+          ShutdownHookManager.removeShutdownDeleteDir(file)
         }
       } finally {
         if (!file.delete()) {
@@ -1478,27 +1378,6 @@ private[spark] object Utils extends Logging {
     serializer.deserialize[T](serializer.serialize(value))
   }
 
-  /**
-   * Detect whether this thread might be executing a shutdown hook. Will always return true if
-   * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
-   * if System.exit was just called by a concurrent thread).
-   *
-   * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
-   * an IllegalStateException.
-   */
-  def inShutdown(): Boolean = {
-    try {
-      val hook = new Thread {
-        override def run() {}
-      }
-      Runtime.getRuntime.addShutdownHook(hook)
-      Runtime.getRuntime.removeShutdownHook(hook)
-    } catch {
-      case ise: IllegalStateException => return true
-    }
-    false
-  }
-
   private def isSpace(c: Char): Boolean = {
     " \t\r\n".indexOf(c) != -1
   }
@@ -2221,37 +2100,6 @@ private[spark] object Utils extends Logging {
     msg.startsWith(BACKUP_STANDALONE_MASTER_PREFIX)
   }
 
-  /**
-   * Adds a shutdown hook with default priority.
-   *
-   * @param hook The code to run during shutdown.
-   * @return A handle that can be used to unregister the shutdown hook.
-   */
-  def addShutdownHook(hook: () => Unit): AnyRef = {
-    addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
-  }
-
-  /**
-   * Adds a shutdown hook with the given priority. Hooks with lower priority values run
-   * first.
-   *
-   * @param hook The code to run during shutdown.
-   * @return A handle that can be used to unregister the shutdown hook.
-   */
-  def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
-    shutdownHooks.add(priority, hook)
-  }
-
-  /**
-   * Remove a previously installed shutdown hook.
-   *
-   * @param ref A handle returned by `addShutdownHook`.
-   * @return Whether the hook was removed.
-   */
-  def removeShutdownHook(ref: AnyRef): Boolean = {
-    shutdownHooks.remove(ref)
-  }
-
   /**
    * To avoid calling `Utils.getCallSite` for every single RDD we create in the body,
    * set a dummy call site that RDDs use instead. This is for performance optimization.
@@ -2299,70 +2147,6 @@ private[spark] object Utils extends Logging {
 
 }
 
-private [util] class SparkShutdownHookManager {
-
-  private val hooks = new PriorityQueue[SparkShutdownHook]()
-  private var shuttingDown = false
-
-  /**
-   * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
-   * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
-   * the best.
-   */
-  def install(): Unit = {
-    val hookTask = new Runnable() {
-      override def run(): Unit = runAll()
-    }
-    Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
-      case Success(shmClass) =>
-        val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
-          .asInstanceOf[Int]
-        val shm = shmClass.getMethod("get").invoke(null)
-        shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
-          .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
-
-      case Failure(_) =>
-        Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
-    }
-  }
-
-  def runAll(): Unit = synchronized {
-    shuttingDown = true
-    while (!hooks.isEmpty()) {
-      Try(Utils.logUncaughtExceptions(hooks.poll().run()))
-    }
-  }
-
-  def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
-    checkState()
-    val hookRef = new SparkShutdownHook(priority, hook)
-    hooks.add(hookRef)
-    hookRef
-  }
-
-  def remove(ref: AnyRef): Boolean = synchronized {
-    hooks.remove(ref)
-  }
-
-  private def checkState(): Unit = {
-    if (shuttingDown) {
-      throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
-    }
-  }
-
-}
-
-private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
-  extends Comparable[SparkShutdownHook] {
-
-  override def compareTo(other: SparkShutdownHook): Int = {
-    other.priority - priority
-  }
-
-  def run(): Unit = hook()
-
-}
-
 /**
  * A utility class to redirect the child process's stdout or stderr.
  */
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 9c047347cb58d8958abf53862e982192daaf0a9a..2c9fa595b2dad2229b820b324dc5fe6b5741d5ff 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
 import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 import org.apache.spark.{Logging, SparkContext}
 
 
@@ -76,7 +76,7 @@ object HiveThriftServer2 extends Logging {
     logInfo("Starting SparkContext")
     SparkSQLEnv.init()
 
-    Utils.addShutdownHook { () =>
+    ShutdownHookManager.addShutdownHook { () =>
       SparkSQLEnv.stop()
       uiTab.foreach(_.detach())
     }
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index d3886142b388d446791682bec29fa8b8b7841fa3..7799704c819d9eaaf95a40ee456e1fccd6481f16 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -39,7 +39,7 @@ import org.apache.thrift.transport.TSocket
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**
  * This code doesn't support remote connections in Hive 1.2+, as the underlying CliDriver
@@ -114,7 +114,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
     SessionState.start(sessionState)
 
     // Clean up after we exit
-    Utils.addShutdownHook { () => SparkSQLEnv.stop() }
+    ShutdownHookManager.addShutdownHook { () => SparkSQLEnv.stop() }
 
     val remoteMode = isRemoteMode(sessionState)
     // "-h" option has been passed, so connect to Hive thrift server.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 296cc5c5e0b04f5804eeed3a9a5fe6560ac74039..4eae699ac3b517f39681cc052081a943efffe6f6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.CacheTableCommand
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 import org.apache.spark.{SparkConf, SparkContext}
 
 /* Implicit conversions */
@@ -154,7 +154,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
   val hiveFilesTemp = File.createTempFile("catalystHiveFiles", "")
   hiveFilesTemp.delete()
   hiveFilesTemp.mkdir()
-  Utils.registerShutdownDeleteDir(hiveFilesTemp)
+  ShutdownHookManager.registerShutdownDeleteDir(hiveFilesTemp)
 
   val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) {
     new File("src" + File.separator + "test" + File.separator + "resources" + File.separator)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 177e710ace54b956cc62271d522458b8c95eb556..b496d1f341a0b8581eee26a12116f40f336e277e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
 import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
-import org.apache.spark.util.{CallSite, Utils}
+import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils}
 
 /**
  * Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -604,7 +604,7 @@ class StreamingContext private[streaming] (
           }
           StreamingContext.setActiveContext(this)
         }
-        shutdownHookRef = Utils.addShutdownHook(
+        shutdownHookRef = ShutdownHookManager.addShutdownHook(
           StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
         // Registering Streaming Metrics at the start of the StreamingContext
         assert(env.metricsSystem != null)
@@ -691,7 +691,7 @@ class StreamingContext private[streaming] (
           StreamingContext.setActiveContext(null)
           waiter.notifyStop()
           if (shutdownHookRef != null) {
-            Utils.removeShutdownHook(shutdownHookRef)
+            ShutdownHookManager.removeShutdownHook(shutdownHookRef)
           }
           logInfo("StreamingContext stopped successfully")
       }
@@ -725,7 +725,7 @@ object StreamingContext extends Logging {
    */
   private val ACTIVATION_LOCK = new Object()
 
-  private val SHUTDOWN_HOOK_PRIORITY = Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1
+  private val SHUTDOWN_HOOK_PRIORITY = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1
 
   private val activeContext = new AtomicReference[StreamingContext](null)
 
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e19940d8d66425e1f4823766c879711dcb21d075..6a8ddb37b29e887460c10639b21bd7a20d7d2bb9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -112,7 +112,8 @@ private[spark] class ApplicationMaster(
       val fs = FileSystem.get(yarnConf)
 
       // This shutdown hook should run *after* the SparkContext is shut down.
-      Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1) { () =>
+      val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
+      ShutdownHookManager.addShutdownHook(priority) { () =>
         val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
         val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
 
@@ -199,7 +200,7 @@ private[spark] class ApplicationMaster(
   final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit = {
     synchronized {
       if (!finished) {
-        val inShutdown = Utils.inShutdown()
+        val inShutdown = ShutdownHookManager.inShutdown()
         logInfo(s"Final app status: $status, exitCode: $code" +
           Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
         exitCode = code