diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 67210e5d4c50e7ec3ed13c79fe3cb0bcc5e096ee..62e6c4f7932df9704ecea8bb34956d8b46e46844 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -45,20 +45,18 @@ import org.apache.spark.util._
 class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
   with PrivateMethodTester with ResetSystemProperties {
 
-  private val conf = new SparkConf(false).set("spark.app.id", "test")
+  var conf: SparkConf = null
   var store: BlockManager = null
   var store2: BlockManager = null
   var store3: BlockManager = null
   var rpcEnv: RpcEnv = null
   var master: BlockManagerMaster = null
-  conf.set("spark.authenticate", "false")
-  val securityMgr = new SecurityManager(conf)
-  val mapOutputTracker = new MapOutputTrackerMaster(conf)
-  val shuffleManager = new HashShuffleManager(conf)
+  val securityMgr = new SecurityManager(new SparkConf(false))
+  val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false))
+  val shuffleManager = new HashShuffleManager(new SparkConf(false))
 
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
-  conf.set("spark.kryoserializer.buffer", "1m")
-  val serializer = new KryoSerializer(conf)
+  val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
 
   // Implicitly convert strings to BlockIds for test clarity.
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
@@ -79,15 +77,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   override def beforeEach(): Unit = {
     super.beforeEach()
-    rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
-
     // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
     System.setProperty("os.arch", "amd64")
-    conf.set("os.arch", "amd64")
-    conf.set("spark.test.useCompressedOops", "true")
+    conf = new SparkConf(false)
+      .set("spark.app.id", "test")
+      .set("spark.kryoserializer.buffer", "1m")
+      .set("spark.test.useCompressedOops", "true")
+      .set("spark.storage.unrollFraction", "0.4")
+      .set("spark.storage.unrollMemoryThreshold", "512")
+
+    rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
     conf.set("spark.driver.port", rpcEnv.address.port.toString)
-    conf.set("spark.storage.unrollFraction", "0.4")
-    conf.set("spark.storage.unrollMemoryThreshold", "512")
 
     master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
@@ -98,6 +98,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   override def afterEach(): Unit = {
     try {
+      conf = null
       if (store != null) {
         store.stop()
         store = null
@@ -473,34 +474,22 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
   }
 
   test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
-    val origTimeoutOpt = conf.getOption("spark.network.timeout")
-    try {
-      conf.set("spark.network.timeout", "2s")
-      store = makeBlockManager(8000, "executor1")
-      store2 = makeBlockManager(8000, "executor2")
-      store3 = makeBlockManager(8000, "executor3")
-      val list1 = List(new Array[Byte](4000))
-      store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-      store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
-      var list1Get = store.getRemoteBytes("list1")
-      assert(list1Get.isDefined, "list1Get expected to be fetched")
-      // block manager exit
-      store2.stop()
-      store2 = null
-      list1Get = store.getRemoteBytes("list1")
-      // get `list1` block
-      assert(list1Get.isDefined, "list1Get expected to be fetched")
-      store3.stop()
-      store3 = null
-      // exception throw because there is no locations
-      intercept[BlockFetchException] {
-        list1Get = store.getRemoteBytes("list1")
-      }
-    } finally {
-      origTimeoutOpt match {
-        case Some(t) => conf.set("spark.network.timeout", t)
-        case None => conf.remove("spark.network.timeout")
-      }
+    conf.set("spark.shuffle.io.maxRetries", "0")
+    store = makeBlockManager(8000, "executor1")
+    store2 = makeBlockManager(8000, "executor2")
+    store3 = makeBlockManager(8000, "executor3")
+    val list1 = List(new Array[Byte](4000))
+    store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
+    store2.stop()
+    store2 = null
+    assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
+    store3.stop()
+    store3 = null
+    // exception throw because there is no locations
+    intercept[BlockFetchException] {
+      store.getRemoteBytes("list1")
     }
   }