diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 7fb2b913770768a384dd4a96de8a3cdc08482cc9..e2f13accdfab505ccca9eb574cb6e725ed40c98c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -274,7 +274,7 @@ object SparkEnv extends Logging {
     val shuffleMemoryManager = new ShuffleMemoryManager(conf)
 
     val blockTransferService =
-      conf.get("spark.shuffle.blockTransferService", "nio").toLowerCase match {
+      conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
         case "netty" =>
           new NettyBlockTransferService(conf)
         case "nio" =>
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index ec3000e722a951035387b11092afe16a3f9f7019..1c4327cf13b51ccfb423fb59e4897aa4690437f8 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -106,5 +106,8 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService {
     result.future
   }
 
-  override def close(): Unit = server.close()
+  override def close(): Unit = {
+    server.close()
+    clientFactory.close()
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1f8de2896116ce6fe47828860a99b9286c11b643..5f5dd0dc1c63fffb611574cff8e75294c2b8f6db 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1178,6 +1178,10 @@ private[spark] class BlockManager(
 
   def stop(): Unit = {
     blockTransferService.close()
+    if (shuffleClient ne blockTransferService) {
+      // Closing should be idempotent, but maybe not for the NioBlockTransferService.
+      shuffleClient.close()
+    }
     diskBlockManager.stop()
     actorSystem.stop(slaveActor)
     blockInfo.clear()
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index f0aa914cfedb6a198ba5fec7e3cd95f20167ce34..66cf60d25f6d117a3d107095d69f2a79c43978a8 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.storage.BlockManagerId
 /**
  * Test add and remove behavior of ExecutorAllocationManager.
  */
-class ExecutorAllocationManagerSuite extends FunSuite {
+class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
   import ExecutorAllocationManager._
   import ExecutorAllocationManagerSuite._
 
@@ -36,17 +36,21 @@ class ExecutorAllocationManagerSuite extends FunSuite {
       .setAppName("test-executor-allocation-manager")
       .set("spark.dynamicAllocation.enabled", "true")
     intercept[SparkException] { new SparkContext(conf) }
+    SparkEnv.get.stop() // cleanup the created environment
 
     // Only min
     val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
     intercept[SparkException] { new SparkContext(conf1) }
+    SparkEnv.get.stop()
 
     // Only max
     val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
     intercept[SparkException] { new SparkContext(conf2) }
+    SparkEnv.get.stop()
 
     // Both min and max, but min > max
     intercept[SparkException] { createSparkContext(2, 1) }
+    SparkEnv.get.stop()
 
     // Both min and max, and min == max
     val sc1 = createSparkContext(1, 1)
@@ -60,18 +64,17 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("starting state") {
-    val sc = createSparkContext()
+    sc = createSparkContext()
     val manager = sc.executorAllocationManager.get
     assert(numExecutorsPending(manager) === 0)
     assert(executorsPendingToRemove(manager).isEmpty)
     assert(executorIds(manager).isEmpty)
     assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
     assert(removeTimes(manager).isEmpty)
-    sc.stop()
   }
 
   test("add executors") {
-    val sc = createSparkContext(1, 10)
+    sc = createSparkContext(1, 10)
     val manager = sc.executorAllocationManager.get
 
     // Keep adding until the limit is reached
@@ -112,11 +115,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
     assert(addExecutors(manager) === 0)
     assert(numExecutorsPending(manager) === 6)
     assert(numExecutorsToAdd(manager) === 1)
-    sc.stop()
   }
 
   test("remove executors") {
-    val sc = createSparkContext(5, 10)
+    sc = createSparkContext(5, 10)
     val manager = sc.executorAllocationManager.get
     (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
 
@@ -163,11 +165,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
     assert(executorsPendingToRemove(manager).isEmpty)
     assert(!removeExecutor(manager, "8"))
     assert(executorsPendingToRemove(manager).isEmpty)
-    sc.stop()
   }
 
   test ("interleaving add and remove") {
-    val sc = createSparkContext(5, 10)
+    sc = createSparkContext(5, 10)
     val manager = sc.executorAllocationManager.get
 
     // Add a few executors
@@ -232,11 +233,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
     onExecutorAdded(manager, "15")
     onExecutorAdded(manager, "16")
     assert(executorIds(manager).size === 10)
-    sc.stop()
   }
 
   test("starting/canceling add timer") {
-    val sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10)
     val clock = new TestClock(8888L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -268,7 +268,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("starting/canceling remove timers") {
-    val sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10)
     val clock = new TestClock(14444L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -313,7 +313,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("mock polling loop with no events") {
-    val sc = createSparkContext(1, 20)
+    sc = createSparkContext(1, 20)
     val manager = sc.executorAllocationManager.get
     val clock = new TestClock(2020L)
     manager.setClock(clock)
@@ -339,7 +339,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("mock polling loop add behavior") {
-    val sc = createSparkContext(1, 20)
+    sc = createSparkContext(1, 20)
     val clock = new TestClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -388,7 +388,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("mock polling loop remove behavior") {
-    val sc = createSparkContext(1, 20)
+    sc = createSparkContext(1, 20)
     val clock = new TestClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -449,7 +449,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("listeners trigger add executors correctly") {
-    val sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10)
     val manager = sc.executorAllocationManager.get
     assert(addTime(manager) === NOT_SET)
 
@@ -479,7 +479,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("listeners trigger remove executors correctly") {
-    val sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10)
     val manager = sc.executorAllocationManager.get
     assert(removeTimes(manager).isEmpty)
 
@@ -510,7 +510,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
   }
 
   test("listeners trigger add and remove executor callbacks correctly") {
-    val sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10)
     val manager = sc.executorAllocationManager.get
     assert(executorIds(manager).isEmpty)
     assert(removeTimes(manager).isEmpty)
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index cbc0bd178d894b25b859c95bfef265f804971b7b..d27880f4bc32f396f9d5f3b5e90262960f398eb9 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.AkkaUtils
 
-class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
+class MapOutputTrackerSuite extends FunSuite {
   private val conf = new SparkConf
 
   test("master start and stop") {
@@ -37,6 +37,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     tracker.trackerActor =
       actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
     tracker.stop()
+    actorSystem.shutdown()
   }
 
   test("master register shuffle and fetch") {
@@ -56,6 +57,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
                                   (BlockManagerId("b", "hostB", 1000), size10000)))
     tracker.stop()
+    actorSystem.shutdown()
   }
 
   test("master register and unregister shuffle") {
@@ -74,6 +76,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     tracker.unregisterShuffle(10)
     assert(!tracker.containsShuffle(10))
     assert(tracker.getServerStatuses(10, 0).isEmpty)
+
+    tracker.stop()
+    actorSystem.shutdown()
   }
 
   test("master register shuffle and unregister map output and fetch") {
@@ -97,6 +102,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     // this should cause it to fail, and the scheduler will ignore the failure due to the
     // stage already being aborted.
     intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
+
+    tracker.stop()
+    actorSystem.shutdown()
   }
 
   test("remote fetch") {
@@ -136,6 +144,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
 
     // failure should be cached
     intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+
+    masterTracker.stop()
+    slaveTracker.stop()
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
   }
 
   test("remote fetch below akka frame size") {
@@ -154,6 +167,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     masterTracker.registerMapOutput(10, 0, MapStatus(
       BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
     masterActor.receive(GetMapOutputStatuses(10))
+
+//    masterTracker.stop() // this throws an exception
+    actorSystem.shutdown()
   }
 
   test("remote fetch exceeds akka frame size") {
@@ -176,5 +192,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
         BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
     }
     intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
+
+//    masterTracker.stop() // this throws an exception
+    actorSystem.shutdown()
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index df237ba796b38d1706339f3a2be3e2fedbaf1adb..0390a2e4f1dbb3a4390952557f97b632521736e7 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester}
+import org.scalatest.{FunSuite, PrivateMethodTester}
 
 import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
@@ -25,12 +25,12 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
 import org.apache.spark.scheduler.local.LocalBackend
 
 class SparkContextSchedulerCreationSuite
-  extends FunSuite with PrivateMethodTester with Logging with BeforeAndAfterEach {
+  extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {
 
   def createTaskScheduler(master: String): TaskSchedulerImpl = {
     // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
     // real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
-    val sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test")
     val createTaskSchedulerMethod =
       PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
     val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 32a19787a28e185c9cbff11c8fcceaddafacaf5c..475026e8eb1407f26022a6b8949eb2e66f21725c 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -145,11 +145,16 @@ class FlumePollingStreamSuite extends TestSuiteBase {
     outputStream.register()
 
     ssc.start()
-    writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
-    assertChannelIsEmpty(channel)
-    assertChannelIsEmpty(channel2)
-    sink.stop()
-    channel.stop()
+    try {
+      writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
+      assertChannelIsEmpty(channel)
+      assertChannelIsEmpty(channel2)
+    } finally {
+      sink.stop()
+      sink2.stop()
+      channel.stop()
+      channel2.stop()
+    }
   }
 
   def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index e7fa4f6bf3b5d24e0b8815122b52988d0b43a51c..0b4a1d8286407c2ab6e6c4d2ef0f891bc3c7b16e 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -58,7 +58,7 @@ public class TransportClientFactory implements Closeable {
   private final ConcurrentHashMap<SocketAddress, TransportClient> connectionPool;
 
   private final Class<? extends Channel> socketChannelClass;
-  private final EventLoopGroup workerGroup;
+  private EventLoopGroup workerGroup;
 
   public TransportClientFactory(TransportContext context) {
     this.context = context;
@@ -150,6 +150,7 @@ public class TransportClientFactory implements Closeable {
 
     if (workerGroup != null) {
       workerGroup.shutdownGracefully();
+      workerGroup = null;
     }
   }
 
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index d1a1877a98f46400a1c742f0dce5c702c25f6a8c..70da48ca8ee795e7fe1fa045b83fd0f155a46359 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -49,6 +49,7 @@ public class TransportServer implements Closeable {
   private ChannelFuture channelFuture;
   private int port = -1;
 
+  /** Creates a TransportServer that binds to the given port, or to any available if 0. */
   public TransportServer(TransportContext context, int portToBind) {
     this.context = context;
     this.conf = context.getConf();
@@ -67,7 +68,7 @@ public class TransportServer implements Closeable {
 
     IOMode ioMode = IOMode.valueOf(conf.ioMode());
     EventLoopGroup bossGroup =
-        NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
+      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
     EventLoopGroup workerGroup = bossGroup;
 
     bootstrap = new ServerBootstrap()
@@ -105,7 +106,7 @@ public class TransportServer implements Closeable {
   @Override
   public void close() {
     if (channelFuture != null) {
-      // close is a local operation and should finish with milliseconds; timeout just to be safe
+      // close is a local operation and should finish within milliseconds; timeout just to be safe
       channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
       channelFuture = null;
     }
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index cc2f6261ca302929059b428a14e825e4e1b47b60..6bbabc44b958b23f21f150244929cf866d4bff24 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -17,6 +17,8 @@
 
 package org.apache.spark.network.shuffle;
 
+import java.io.Closeable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,4 +87,9 @@ public class ExternalShuffleClient implements ShuffleClient {
       JavaUtils.serialize(new RegisterExecutor(appId, execId, executorInfo));
     client.sendRpcSync(registerExecutorMessage, 5000 /* timeoutMs */);
   }
+
+  @Override
+  public void close() {
+    clientFactory.close();
+  }
 }
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
index 9fa87c2c6e1c2c98167db259517dc177ed111200..d46a5623945574d7fa8548485f56ae5775155623 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
@@ -17,8 +17,10 @@
 
 package org.apache.spark.network.shuffle;
 
+import java.io.Closeable;
+
 /** Provides an interface for reading shuffle files, either from an Executor or external service. */
-public interface ShuffleClient {
+public interface ShuffleClient extends Closeable {
   /**
    * Fetch a sequence of blocks from a remote node asynchronously,
    *
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 655cec1573f58437fb9318cbc884fdd2902e308d..f47772947d67c65c97ca8c76e0962082cfc35d48 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -46,6 +46,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
   after {
     if (ssc != null) {
       ssc.stop()
+      if (ssc.sc != null) {
+        // Calling ssc.stop() does not always stop the associated SparkContext.
+        ssc.sc.stop()
+      }
       ssc = null
     }
     if (sc != null) {