diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 8ca731038e5286cbc1f3ea33f5446266471962ad..e72826dc25f410b06cb764cd28e4868d4b084d90 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
 import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
 import com.google.common.io.Files
 
+import org.apache.spark.util.Utils
+
 /**
  * Utilities for tests. Included in main codebase since it's used by multiple
  * projects.
@@ -42,8 +44,7 @@ private[spark] object TestUtils {
    * in order to avoid interference between tests.
    */
   def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
-    val tempDir = Files.createTempDir()
-    tempDir.deleteOnExit()
+    val tempDir = Utils.createTempDir()
     val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
     val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
     createJar(files, jarFile)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3d307b3c16d3ef91af4df764aeae12e9604fa961..07477dd460a4b36462cac6cd4fc289b03d5a7e55 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -168,6 +168,20 @@ private[spark] object Utils extends Logging {
   private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
   private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
 
+  // Add a shutdown hook to delete the temp dirs when the JVM exits
+  Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
+    override def run(): Unit = Utils.logUncaughtExceptions {
+      logDebug("Shutdown hook called")
+      shutdownDeletePaths.foreach { dirPath =>
+        try {
+          Utils.deleteRecursively(new File(dirPath))
+        } catch {
+          case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
+        }
+      }
+    }
+  })
+
   // Register the path to be deleted via shutdown hook
   def registerShutdownDeleteDir(file: File) {
     val absolutePath = file.getAbsolutePath()
@@ -252,14 +266,6 @@ private[spark] object Utils extends Logging {
     }
 
     registerShutdownDeleteDir(dir)
-
-    // Add a shutdown hook to delete the temp dir when the JVM exits
-    Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
-      override def run() {
-        // Attempt to delete if some patch which is parent of this is not already registered.
-        if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
-      }
-    })
     dir
   }
 
@@ -666,15 +672,30 @@ private[spark] object Utils extends Logging {
    */
   def deleteRecursively(file: File) {
     if (file != null) {
-      if (file.isDirectory() && !isSymlink(file)) {
-        for (child <- listFilesSafely(file)) {
-          deleteRecursively(child)
+      try {
+        if (file.isDirectory && !isSymlink(file)) {
+          var savedIOException: IOException = null
+          for (child <- listFilesSafely(file)) {
+            try {
+              deleteRecursively(child)
+            } catch {
+              // In case of multiple exceptions, only last one will be thrown
+              case ioe: IOException => savedIOException = ioe
+            }
+          }
+          if (savedIOException != null) {
+            throw savedIOException
+          }
+          shutdownDeletePaths.synchronized {
+            shutdownDeletePaths.remove(file.getAbsolutePath)
+          }
         }
-      }
-      if (!file.delete()) {
-        // Delete can also fail if the file simply did not exist
-        if (file.exists()) {
-          throw new IOException("Failed to delete: " + file.getAbsolutePath)
+      } finally {
+        if (!file.delete()) {
+          // Delete can also fail if the file simply did not exist
+          if (file.exists()) {
+            throw new IOException("Failed to delete: " + file.getAbsolutePath)
+          }
         }
       }
     }
@@ -713,7 +734,7 @@ private[spark] object Utils extends Logging {
    */
   def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
     if (!dir.isDirectory) {
-      throw new IllegalArgumentException("$dir is not a directory!")
+      throw new IllegalArgumentException(s"$dir is not a directory!")
     }
     val filesAndDirs = dir.listFiles()
     val cutoffTimeInMillis = System.currentTimeMillis - (cutoff * 1000)
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 7e18f45de7b5b55569426daf31357a08b5212b93..a8867020e457dff7a21cb6941b6539416c43a885 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark
 import java.io._
 import java.util.jar.{JarEntry, JarOutputStream}
 
-import com.google.common.io.Files
 import org.scalatest.FunSuite
 
 import org.apache.spark.SparkContext._
@@ -41,8 +40,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
   override def beforeAll() {
     super.beforeAll()
 
-    tmpDir = Files.createTempDir()
-    tmpDir.deleteOnExit()
+    tmpDir = Utils.createTempDir()
     val testTempDir = new File(tmpDir, "test")
     testTempDir.mkdir()
 
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 4a53d25012ad9cbb03c76088df8c705afa172ed2..a2b74c4419d46610da8ffd86a50d7c90c1b48737 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -21,7 +21,6 @@ import java.io.{File, FileWriter}
 
 import scala.io.Source
 
-import com.google.common.io.Files
 import org.apache.hadoop.io._
 import org.apache.hadoop.io.compress.DefaultCodec
 import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, FileSplit, TextInputFormat, TextOutputFormat}
@@ -39,8 +38,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
 
   override def beforeEach() {
     super.beforeEach()
-    tempDir = Files.createTempDir()
-    tempDir.deleteOnExit()
+    tempDir = Utils.createTempDir()
   }
 
   override def afterEach() {
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 4cba90e8f2afe7faa303b8021e13370e3e637b30..1cdf50d5c08c7c4b66084fbb3516c418d081dff3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -26,7 +26,6 @@ import org.apache.spark.deploy.SparkSubmit._
 import org.apache.spark.util.Utils
 import org.scalatest.FunSuite
 import org.scalatest.Matchers
-import com.google.common.io.Files
 
 class SparkSubmitSuite extends FunSuite with Matchers {
   def beforeAll() {
@@ -332,7 +331,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
   }
 
   def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
-    val tmpDir = Files.createTempDir()
+    val tmpDir = Utils.createTempDir()
 
     val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
     val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf))
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index d5ebfb3f3fae14a4216525ec3fcf90807a49e9e8..12d1c7b2faba628d8bb330eb6c417a7e1df3d187 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -23,8 +23,6 @@ import java.io.FileOutputStream
 
 import scala.collection.immutable.IndexedSeq
 
-import com.google.common.io.Files
-
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 
@@ -66,9 +64,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
    *   3) Does the contents be the same.
    */
   test("Correctness of WholeTextFileRecordReader.") {
-
-    val dir = Files.createTempDir()
-    dir.deleteOnExit()
+    val dir = Utils.createTempDir()
     println(s"Local disk address is ${dir.toString}.")
 
     WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 75b01191901b8b524395f4c4c17a9ddcceeea41c..3620e251cc139790ebcf69bcdc1fd9ca32a85e23 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -24,13 +24,14 @@ import org.apache.hadoop.util.Progressable
 import scala.collection.mutable.{ArrayBuffer, HashSet}
 import scala.util.Random
 
-import com.google.common.io.Files
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter,
 OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter,
 TaskAttemptContext => NewTaskAttempContext}
 import org.apache.spark.{Partitioner, SharedSparkContext}
 import org.apache.spark.SparkContext._
+import org.apache.spark.util.Utils
+
 import org.scalatest.FunSuite
 
 class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
@@ -381,14 +382,16 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
   }
 
   test("zero-partition RDD") {
-    val emptyDir = Files.createTempDir()
-    emptyDir.deleteOnExit()
-    val file = sc.textFile(emptyDir.getAbsolutePath)
-    assert(file.partitions.size == 0)
-    assert(file.collect().toList === Nil)
-    // Test that a shuffle on the file works, because this used to be a bug
-    assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
-    emptyDir.delete()
+    val emptyDir = Utils.createTempDir()
+    try {
+      val file = sc.textFile(emptyDir.getAbsolutePath)
+      assert(file.partitions.isEmpty)
+      assert(file.collect().toList === Nil)
+      // Test that a shuffle on the file works, because this used to be a bug
+      assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+    } finally {
+      Utils.deleteRecursively(emptyDir)
+    }
   }
 
   test("keys and values") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 3efa85431876b2ed373ec025f1d331274a903ed6..abc300fcffaf9701d2b58582c3d7ada4d7a7204e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler
 import scala.collection.mutable
 import scala.io.Source
 
-import com.google.common.io.Files
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -51,8 +50,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
   private var logDirPath: Path = _
 
   before {
-    testDir = Files.createTempDir()
-    testDir.deleteOnExit()
+    testDir = Utils.createTempDir()
     logDirPath = Utils.getFilePath(testDir, "spark-events")
   }
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 48114feee6233f47d6d6b038ff5d218338660f39..e05f373392d4af6a38379bbf2c76b04b985a5974 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.scheduler
 
 import java.io.{File, PrintWriter}
 
-import com.google.common.io.Files
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
@@ -39,8 +38,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
   private var testDir: File = _
 
   before {
-    testDir = Files.createTempDir()
-    testDir.deleteOnExit()
+    testDir = Utils.createTempDir()
   }
 
   after {
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index e4522e00a622de44fccd1c4ba7af69685b94a2d0..bc5c74c126b74c949cf57a41dbd6cc16b27f48d8 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -19,22 +19,13 @@ package org.apache.spark.storage
 
 import java.io.{File, FileWriter}
 
-import org.apache.spark.network.nio.NioBlockTransferService
-import org.apache.spark.shuffle.hash.HashShuffleManager
-
-import scala.collection.mutable
 import scala.language.reflectiveCalls
 
-import akka.actor.Props
-import com.google.common.io.Files
 import org.mockito.Mockito.{mock, when}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.util.{AkkaUtils, Utils}
-import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.util.Utils
 
 class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
   private val testConf = new SparkConf(false)
@@ -48,10 +39,8 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
 
   override def beforeAll() {
     super.beforeAll()
-    rootDir0 = Files.createTempDir()
-    rootDir0.deleteOnExit()
-    rootDir1 = Files.createTempDir()
-    rootDir1.deleteOnExit()
+    rootDir0 = Utils.createTempDir()
+    rootDir1 = Utils.createTempDir()
     rootDirs = rootDir0.getAbsolutePath + "," + rootDir1.getAbsolutePath
   }
 
diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
index c3dd156b405142852b398f36d4d07fbedf9433a9..dc2a05631d83d00dfb7c30acf944825edde10f8e 100644
--- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
@@ -21,7 +21,6 @@ import java.io.{File, IOException}
 
 import scala.io.Source
 
-import com.google.common.io.Files
 import org.apache.hadoop.fs.Path
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
@@ -44,7 +43,7 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter {
   private var logDirPathString: String = _
 
   before {
-    testDir = Files.createTempDir()
+    testDir = Utils.createTempDir()
     logDirPath = Utils.getFilePath(testDir, "test-file-logger")
     logDirPathString = logDirPath.toString
   }
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index e63d9d085e385495c8936d817de786e59565d3f7..0344da60dae66e2f32ffefd9a2c6666f67ef3d2f 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -112,7 +112,7 @@ class UtilsSuite extends FunSuite {
   }
 
   test("reading offset bytes of a file") {
-    val tmpDir2 = Files.createTempDir()
+    val tmpDir2 = Utils.createTempDir()
     tmpDir2.deleteOnExit()
     val f1Path = tmpDir2 + "/f1"
     val f1 = new FileOutputStream(f1Path)
@@ -141,7 +141,7 @@ class UtilsSuite extends FunSuite {
   }
 
   test("reading offset bytes across multiple files") {
-    val tmpDir = Files.createTempDir()
+    val tmpDir = Utils.createTempDir()
     tmpDir.deleteOnExit()
     val files = (1 to 3).map(i => new File(tmpDir, i.toString))
     Files.write("0123456789", files(0), Charsets.UTF_8)
@@ -308,4 +308,28 @@ class UtilsSuite extends FunSuite {
     }
   }
 
+  test("deleteRecursively") {
+    val tempDir1 = Utils.createTempDir()
+    assert(tempDir1.exists())
+    Utils.deleteRecursively(tempDir1)
+    assert(!tempDir1.exists())
+
+    val tempDir2 = Utils.createTempDir()
+    val tempFile1 = new File(tempDir2, "foo.txt")
+    Files.touch(tempFile1)
+    assert(tempFile1.exists())
+    Utils.deleteRecursively(tempFile1)
+    assert(!tempFile1.exists())
+
+    val tempDir3 = new File(tempDir2, "subdir")
+    assert(tempDir3.mkdir())
+    val tempFile2 = new File(tempDir3, "bar.txt")
+    Files.touch(tempFile2)
+    assert(tempFile2.exists())
+    Utils.deleteRecursively(tempDir2)
+    assert(!tempDir2.exists())
+    assert(!tempDir3.exists())
+    assert(!tempFile2.exists())
+  }
+
 }
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index 8ef2bb1bf6a780b8ab0152accc8a6de622582881..0dbe766b4d917152647221b6d840fb564c8fed2b 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -67,8 +67,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
         |0
         |0 2:4.0 4:5.0 6:6.0
       """.stripMargin
-    val tempDir = Files.createTempDir()
-    tempDir.deleteOnExit()
+    val tempDir = Utils.createTempDir()
     val file = new File(tempDir.getPath, "part-00000")
     Files.write(lines, file, Charsets.US_ASCII)
     val path = tempDir.toURI.toString
@@ -100,7 +99,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
       LabeledPoint(1.1, Vectors.sparse(3, Seq((0, 1.23), (2, 4.56)))),
       LabeledPoint(0.0, Vectors.dense(1.01, 2.02, 3.03))
     ), 2)
-    val tempDir = Files.createTempDir()
+    val tempDir = Utils.createTempDir()
     val outputDir = new File(tempDir, "output")
     MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString)
     val lines = outputDir.listFiles()
@@ -166,7 +165,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
       Vectors.sparse(2, Array(1), Array(-1.0)),
       Vectors.dense(0.0, 1.0)
     ), 2)
-    val tempDir = Files.createTempDir()
+    val tempDir = Utils.createTempDir()
     val outputDir = new File(tempDir, "vectors")
     val path = outputDir.toURI.toString
     vectors.saveAsTextFile(path)
@@ -181,7 +180,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
       LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0))),
       LabeledPoint(1.0, Vectors.dense(0.0, 1.0))
     ), 2)
-    val tempDir = Files.createTempDir()
+    val tempDir = Utils.createTempDir()
     val outputDir = new File(tempDir, "points")
     val path = outputDir.toURI.toString
     points.saveAsTextFile(path)
diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index 3e2ee7541f40d6bae1c58b7cb85e90e1b4b9475f..6a79e76a34db84f45b4a252d1d29d84fe5cebfd3 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -23,8 +23,6 @@ import java.net.{URL, URLClassLoader}
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 
-import com.google.common.io.Files
-
 import org.apache.spark.{SparkConf, TestUtils}
 import org.apache.spark.util.Utils
 
@@ -39,10 +37,8 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
 
   override def beforeAll() {
     super.beforeAll()
-    tempDir1 = Files.createTempDir()
-    tempDir1.deleteOnExit()
-    tempDir2 = Files.createTempDir()
-    tempDir2.deleteOnExit()
+    tempDir1 = Utils.createTempDir()
+    tempDir2 = Utils.createTempDir()
     url1 = "file://" + tempDir1
     urls2 = List(tempDir2.toURI.toURL).toArray
     childClassNames.foreach(TestUtils.createCompiledClass(_, tempDir1, "1"))
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index c8763eb2770523b10cdbe25bca9a646c43c9aac0..91c9c52c3c98a095429efbba272f2bca2a77dc26 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -22,7 +22,6 @@ import java.net.URLClassLoader
 
 import scala.collection.mutable.ArrayBuffer
 
-import com.google.common.io.Files
 import org.scalatest.FunSuite
 import org.apache.spark.SparkContext
 import org.apache.commons.lang3.StringEscapeUtils
@@ -190,8 +189,7 @@ class ReplSuite extends FunSuite {
   }
 
   test("interacting with files") {
-    val tempDir = Files.createTempDir()
-    tempDir.deleteOnExit()
+    val tempDir = Utils.createTempDir()
     val out = new FileWriter(tempDir + "/input")
     out.write("Hello world!\n")
     out.write("What's up?\n")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 8511390cb1ad56d8180b9a9bb6cad246c8dd67bb..e5592e52b0d2d0d161a6932019c3901ee9a2b5c8 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -231,8 +231,7 @@ class CheckpointSuite extends TestSuiteBase {
   // failure, are re-processed or not.
   test("recovery with file input stream") {
     // Set up the streaming context and input streams
-    val testDir = Files.createTempDir()
-    testDir.deleteOnExit()
+    val testDir = Utils.createTempDir()
     var ssc = new StreamingContext(master, framework, Seconds(1))
     ssc.checkpoint(checkpointDir)
     val fileStream = ssc.textFileStream(testDir.toString)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 952a74fd5f6dedea6d6bf9f22516e9c2458f88ae..a44a45a3e9bd6d48de4c8d1cb195b4e5a0f4407d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -98,8 +98,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
 
     // Set up the streaming context and input streams
-    val testDir = Files.createTempDir()
-    testDir.deleteOnExit()
+    val testDir = Utils.createTempDir()
     val ssc = new StreamingContext(conf, batchDuration)
     val fileStream = ssc.textFileStream(testDir.toString)
     val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index c53c01706083a77d0b449b5afdf068820ec1e231..5dbb7232009eb8e09afb96b5775b9340b5150d41 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -352,8 +352,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
   extends Thread with Logging {
 
   override def run() {
-    val localTestDir = Files.createTempDir()
-    localTestDir.deleteOnExit()
+    val localTestDir = Utils.createTempDir()
     var fs = testDir.getFileSystem(new Configuration())
     val maxTries = 3
     try {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 759baacaa43084a3267e0356b85479c0abca8887..9327ff4822699ec980cd0d226289b6a0bf85d496 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -24,12 +24,12 @@ import scala.collection.mutable.SynchronizedBuffer
 import scala.reflect.ClassTag
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
-import com.google.common.io.Files
 
 import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
 import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
 
 /**
  * This is a input stream just for the testsuites. This is equivalent to a checkpointable,
@@ -120,9 +120,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
 
   // Directory where the checkpoint data will be saved
   lazy val checkpointDir = {
-    val dir = Files.createTempDir()
+    val dir = Utils.createTempDir()
     logDebug(s"checkpointDir: $dir")
-    dir.deleteOnExit()
     dir.toString
   }
 
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
index 9bd916100dd2c89e8d27819df6f908e1c5bea850..17b79ae1d82c4ee1871de3910ff08bdc7f0f20b4 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
@@ -20,13 +20,10 @@ package org.apache.spark.deploy.yarn
 import java.io.File
 import java.net.URI
 
-import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.MRJobConfig
-import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.mockito.Matchers._
@@ -117,7 +114,7 @@ class ClientBaseSuite extends FunSuite with Matchers {
     doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
       any(classOf[Path]), anyShort(), anyBoolean())
 
-    val tempDir = Files.createTempDir()
+    val tempDir = Utils.createTempDir()
     try {
       client.prepareLocalResources(tempDir.getAbsolutePath())
       sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER))