diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index eac901d10067cf02bdbcfa6cb948dff76aef7c3c..9f800e3a0953cd8ad73ed4b299b94440a7622f76 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -239,12 +239,17 @@ private[spark] object ReliableCheckpointRDD extends Logging {
       val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
       val fileInputStream = fs.open(partitionerFilePath, bufferSize)
       val serializer = SparkEnv.get.serializer.newInstance()
-      val deserializeStream = serializer.deserializeStream(fileInputStream)
-      val partitioner = Utils.tryWithSafeFinally[Partitioner] {
-        deserializeStream.readObject[Partitioner]
+      val partitioner = Utils.tryWithSafeFinally {
+        val deserializeStream = serializer.deserializeStream(fileInputStream)
+        Utils.tryWithSafeFinally {
+          deserializeStream.readObject[Partitioner]
+        } {
+          deserializeStream.close()
+        }
       } {
-        deserializeStream.close()
+        fileInputStream.close()
       }
+
       logDebug(s"Read partitioner from $partitionerFilePath")
       Some(partitioner)
     } catch {
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index cc52bb1d23cd5d4ebcc8f4f03820a729a44a3a31..89f0b1cb5b56a7b84e8b573b4ed8bd01d882224d 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -58,10 +58,15 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
     nums.saveAsTextFile(outputDir)
     // Read the plain text file and check it's OK
     val outputFile = new File(outputDir, "part-00000")
-    val content = Source.fromFile(outputFile).mkString
-    assert(content === "1\n2\n3\n4\n")
-    // Also try reading it in as a text file RDD
-    assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
+    val bufferSrc = Source.fromFile(outputFile)
+    Utils.tryWithSafeFinally {
+      val content = bufferSrc.mkString
+      assert(content === "1\n2\n3\n4\n")
+      // Also try reading it in as a text file RDD
+      assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
+    } {
+      bufferSrc.close()
+    }
   }
 
   test("text files (compressed)") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
index 13cba94578a6af80bb94e07ae7ea44470c2731b8..005587051b6ad1280091bbce5b2b4e6c0e716c2d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.api.r.RUtils
 import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{ResetSystemProperties, Utils}
 
 class RPackageUtilsSuite
   extends SparkFunSuite
@@ -74,9 +74,13 @@ class RPackageUtilsSuite
     val deps = Seq(dep1, dep2).mkString(",")
     IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
       val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new File(new URI(repo)))))
-      assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
-      assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
-      assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
+      Utils.tryWithSafeFinally {
+        assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
+        assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
+        assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
+      } {
+        jars.foreach(_.close())
+      }
     }
   }
 
@@ -131,7 +135,7 @@ class RPackageUtilsSuite
 
   test("SparkR zipping works properly") {
     val tempDir = Files.createTempDir()
-    try {
+    Utils.tryWithSafeFinally {
       IvyTestUtils.writeFile(tempDir, "test.R", "abc")
       val fakeSparkRDir = new File(tempDir, "SparkR")
       assert(fakeSparkRDir.mkdirs())
@@ -144,14 +148,19 @@ class RPackageUtilsSuite
       IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
       val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
       assert(finalZip.exists())
-      val entries = new ZipFile(finalZip).entries().asScala.map(_.getName).toSeq
-      assert(entries.contains("/test.R"))
-      assert(entries.contains("/SparkR/abc.R"))
-      assert(entries.contains("/SparkR/DESCRIPTION"))
-      assert(!entries.contains("/package.zip"))
-      assert(entries.contains("/packageTest/def.R"))
-      assert(entries.contains("/packageTest/DESCRIPTION"))
-    } finally {
+      val zipFile = new ZipFile(finalZip)
+      Utils.tryWithSafeFinally {
+        val entries = zipFile.entries().asScala.map(_.getName).toSeq
+        assert(entries.contains("/test.R"))
+        assert(entries.contains("/SparkR/abc.R"))
+        assert(entries.contains("/SparkR/DESCRIPTION"))
+        assert(!entries.contains("/package.zip"))
+        assert(entries.contains("/packageTest/def.R"))
+        assert(entries.contains("/packageTest/DESCRIPTION"))
+      } {
+        zipFile.close()
+      }
+    } {
       FileUtils.deleteDirectory(tempDir)
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index a5eda7b5a5a759bf2323e9dc534a432654deaabb..2c41c432d1fe259a9fff6c5e11e89d8c5fcb538b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -449,8 +449,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
     val bstream = new BufferedOutputStream(cstream)
     if (isNewFormat) {
-      EventLoggingListener.initEventLog(new FileOutputStream(file))
+      val newFormatStream = new FileOutputStream(file)
+      Utils.tryWithSafeFinally {
+        EventLoggingListener.initEventLog(newFormatStream)
+      } {
+        newFormatStream.close()
+      }
     }
+
     val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8)
     Utils.tryWithSafeFinally {
       events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 7f4859206e2577f3658d9b1db7f68fcae8a77dc4..8a5ec37eeb66cd1148cb47421b5739a53ca29a8c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -202,8 +202,6 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
 
     // Make sure expected events exist in the log file.
     val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
-    val logStart = SparkListenerLogStart(SPARK_VERSION)
-    val lines = readLines(logData)
     val eventSet = mutable.Set(
       SparkListenerApplicationStart,
       SparkListenerBlockManagerAdded,
@@ -216,19 +214,25 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
       SparkListenerTaskStart,
       SparkListenerTaskEnd,
       SparkListenerApplicationEnd).map(Utils.getFormattedClassName)
-    lines.foreach { line =>
-      eventSet.foreach { event =>
-        if (line.contains(event)) {
-          val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
-          val eventType = Utils.getFormattedClassName(parsedEvent)
-          if (eventType == event) {
-            eventSet.remove(event)
+    Utils.tryWithSafeFinally {
+      val logStart = SparkListenerLogStart(SPARK_VERSION)
+      val lines = readLines(logData)
+      lines.foreach { line =>
+        eventSet.foreach { event =>
+          if (line.contains(event)) {
+            val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
+            val eventType = Utils.getFormattedClassName(parsedEvent)
+            if (eventType == event) {
+              eventSet.remove(event)
+            }
           }
         }
       }
+      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+      assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
+    } {
+      logData.close()
     }
-    assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
-    assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
   }
 
   private def readLines(in: InputStream): Seq[String] = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 9e472f900b655bb62cd6db13b08b3cc1b7c409e1..ee95e4ff7dbc343de50ad8811aa603cfe5c84b76 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -183,9 +183,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
 
     // ensure we reset the classloader after the test completes
     val originalClassLoader = Thread.currentThread.getContextClassLoader
-    try {
+    val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
+    Utils.tryWithSafeFinally {
       // load the exception from the jar
-      val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
       loader.addURL(jarFile.toURI.toURL)
       Thread.currentThread().setContextClassLoader(loader)
       val excClass: Class[_] = Utils.classForName("repro.MyException")
@@ -209,8 +209,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
 
       assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined)
       assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty)
-    } finally {
+    } {
       Thread.currentThread.setContextClassLoader(originalClassLoader)
+      loader.close()
     }
   }
 
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index e4e9be39ff6f9c012f52f1743a99c8e8c9993b04..665708a780c485b95df79eb2ddfc7eed557a1d38 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -155,13 +155,17 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
     val tempDir = Utils.createTempDir()
     val outputDir = new File(tempDir, "output")
     MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString)
-    val lines = outputDir.listFiles()
+    val sources = outputDir.listFiles()
       .filter(_.getName.startsWith("part-"))
-      .flatMap(Source.fromFile(_).getLines())
-      .toSet
-    val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
-    assert(lines === expected)
-    Utils.deleteRecursively(tempDir)
+      .map(Source.fromFile)
+    Utils.tryWithSafeFinally {
+      val lines = sources.flatMap(_.getLines()).toSet
+      val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
+      assert(lines === expected)
+    } {
+      sources.foreach(_.close())
+      Utils.deleteRecursively(tempDir)
+    }
   }
 
   test("appendBias") {
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 3d54abd903b6d0140873e279934b529604f7ae6a..648a5abe0b89ab725e6fce9183996e452b56e798 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1805,6 +1805,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     // will be re-processed after recovery
     List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
     assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
+    ssc.stop();
     Utils.deleteRecursively(tempDir);
   }
 
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index b79cc65d8b5e910f47aaa9656b7d833734925339..41f16bfa5fc7f793b088a9ab0eae5064348c3b12 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -642,16 +642,18 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
         val fileStream = ssc.textFileStream(testDir.toString)
         // Make value 3 take a large time to process, to ensure that the driver
         // shuts down in the middle of processing the 3rd batch
-        CheckpointSuite.batchThreeShouldBlockIndefinitely = true
-        val mappedStream = fileStream.map(s => {
+        CheckpointSuite.batchThreeShouldBlockALongTime = true
+        val mappedStream = fileStream.map { s =>
           val i = s.toInt
           if (i == 3) {
-            while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
-              Thread.sleep(Long.MaxValue)
+            if (CheckpointSuite.batchThreeShouldBlockALongTime) {
+              // It's not a good idea to let the thread run forever
+              // as resource won't be correctly released
+              Thread.sleep(6000)
             }
           }
           i
-        })
+        }
 
         // Reducing over a large window to ensure that recovery from driver failure
         // requires reprocessing of all the files seen before the failure
@@ -691,7 +693,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
       }
 
       // The original StreamingContext has now been stopped.
-      CheckpointSuite.batchThreeShouldBlockIndefinitely = false
+      CheckpointSuite.batchThreeShouldBlockALongTime = false
 
       // Create files while the streaming driver is down
       for (i <- Seq(4, 5, 6)) {
@@ -928,5 +930,5 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
 }
 
 private object CheckpointSuite extends Serializable {
-  var batchThreeShouldBlockIndefinitely: Boolean = true
+  var batchThreeShouldBlockALongTime: Boolean = true
 }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 60c8e702352cf0634803e902dd6ce6905b201d37..fff2d6fbace3aad9b9075a291ef35de6c03d43ca 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -164,6 +164,7 @@ object MasterFailureTest extends Logging {
     val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)
 
     fileGeneratingThread.join()
+    ssc.stop()
     fs.delete(checkpointDir, true)
     fs.delete(testDir, true)
     logInfo("Finished test after " + killCount + " failures")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 851013bb1e8466eef88cac21a075084d6f748a26..107c3f5dcc08dbaceaf7842cdecbe7d3610ecf9e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -134,6 +134,7 @@ class ReceivedBlockTrackerSuite
     val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent)
     getWrittenLogData() shouldEqual expectedWrittenData1
     getWriteAheadLogFiles() should have size 1
+    tracker1.stop()
 
     incrementTime()
 
@@ -141,6 +142,7 @@ class ReceivedBlockTrackerSuite
     val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false)
     tracker1_.getUnallocatedBlocks(streamId) shouldBe empty
     tracker1_.hasUnallocatedReceivedBlocks should be (false)
+    tracker1_.stop()
 
     // Restart tracker and verify recovered list of unallocated blocks
     val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
@@ -163,6 +165,7 @@ class ReceivedBlockTrackerSuite
     val blockInfos2 = addBlockInfos(tracker2)
     tracker2.allocateBlocksToBatch(batchTime2)
     tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
+    tracker2.stop()
 
     // Verify whether log has correct contents
     val expectedWrittenData2 = expectedWrittenData1 ++
@@ -192,6 +195,7 @@ class ReceivedBlockTrackerSuite
       getWriteAheadLogFiles() should not contain oldestLogFile
     }
     printLogFiles("After clean")
+    tracker3.stop()
 
     // Restart tracker and verify recovered state, specifically whether info about the first
     // batch has been removed, but not the second batch
@@ -200,6 +204,7 @@ class ReceivedBlockTrackerSuite
     tracker4.getUnallocatedBlocks(streamId) shouldBe empty
     tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty  // should be cleaned
     tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
+    tracker4.stop()
   }
 
   test("disable write ahead log when checkpoint directory is not set") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 24cb5afee33c28f77d2df21d4414cf51be321406..4bec52b9fe4fe5809daf429e361536832d40104a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -139,6 +139,7 @@ abstract class CommonWriteAheadLogTests(
         assert(getLogFilesInDirectory(testDir).size < logFiles.size)
       }
     }
+    writeAheadLog.close()
   }
 
   test(testPrefix + "handling file errors while reading rotating logs") {