diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 2a4447705fa65ac4dd064115f2d58ddc8ca92ef6..d441a4d31b95424358bfe3dad77035e00367c7a2 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -139,8 +139,8 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
   }
 
   private def addShutdownHook(): AnyRef = {
-    Utils.addShutdownHook { () =>
-      logDebug("Shutdown hook called")
+    Utils.addShutdownHook(Utils.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
+      logInfo("Shutdown hook called")
       DiskBlockManager.this.doStop()
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 763d4db6901873806985d58380f5249b40f841a9..693e1a0a3d5f097f20e524e1f3f65f8a26729977 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -73,6 +73,13 @@ private[spark] object Utils extends Logging {
    */
   val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
 
+  /**
+   * The shutdown priority of temp directory must be lower than the SparkContext shutdown
+   * priority. Otherwise cleaning the temp directories while Spark jobs are running can
+   * throw undesirable errors at the time of shutdown.
+   */
+  val TEMP_DIR_SHUTDOWN_PRIORITY = 25
+
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   @volatile private var localRootDirs: Array[String] = null
 
@@ -189,10 +196,11 @@ private[spark] object Utils extends Logging {
   private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
 
   // Add a shutdown hook to delete the temp dirs when the JVM exits
-  addShutdownHook { () =>
-    logDebug("Shutdown hook called")
+  addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
+    logInfo("Shutdown hook called")
     shutdownDeletePaths.foreach { dirPath =>
       try {
+        logInfo("Deleting directory " + dirPath)
         Utils.deleteRecursively(new File(dirPath))
       } catch {
         case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)