Skip to content
Snippets Groups Projects
Commit 25782981 authored by Josh Rosen's avatar Josh Rosen Committed by Andrew Or
Browse files

[SPARK-12174] Speed up BlockManagerSuite getRemoteBytes() test

This patch significantly speeds up the BlockManagerSuite's "SPARK-9591: getRemoteBytes from another location when Exception throw" test, reducing the test time from 45s to ~250ms. The key change was to set `spark.shuffle.io.maxRetries` to 0 (the code previously set `spark.network.timeout` to `2s`, but this didn't make a difference because the slowdown was not due to this timeout).

Along the way, I also cleaned up the way that we handle SparkConf in BlockManagerSuite: previously, each test would mutate a shared SparkConf instance, while now each test gets a fresh SparkConf.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #10759 from JoshRosen/SPARK-12174.
parent bcc7373f
No related branches found
No related tags found
No related merge requests found
......@@ -45,20 +45,18 @@ import org.apache.spark.util._
class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
with PrivateMethodTester with ResetSystemProperties {
private val conf = new SparkConf(false).set("spark.app.id", "test")
var conf: SparkConf = null
var store: BlockManager = null
var store2: BlockManager = null
var store3: BlockManager = null
var rpcEnv: RpcEnv = null
var master: BlockManagerMaster = null
conf.set("spark.authenticate", "false")
val securityMgr = new SecurityManager(conf)
val mapOutputTracker = new MapOutputTrackerMaster(conf)
val shuffleManager = new HashShuffleManager(conf)
val securityMgr = new SecurityManager(new SparkConf(false))
val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false))
val shuffleManager = new HashShuffleManager(new SparkConf(false))
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
conf.set("spark.kryoserializer.buffer", "1m")
val serializer = new KryoSerializer(conf)
val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
// Implicitly convert strings to BlockIds for test clarity.
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
......@@ -79,15 +77,17 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
override def beforeEach(): Unit = {
super.beforeEach()
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
System.setProperty("os.arch", "amd64")
conf.set("os.arch", "amd64")
conf.set("spark.test.useCompressedOops", "true")
conf = new SparkConf(false)
.set("spark.app.id", "test")
.set("spark.kryoserializer.buffer", "1m")
.set("spark.test.useCompressedOops", "true")
.set("spark.storage.unrollFraction", "0.4")
.set("spark.storage.unrollMemoryThreshold", "512")
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set("spark.driver.port", rpcEnv.address.port.toString)
conf.set("spark.storage.unrollFraction", "0.4")
conf.set("spark.storage.unrollMemoryThreshold", "512")
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
......@@ -98,6 +98,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
override def afterEach(): Unit = {
try {
conf = null
if (store != null) {
store.stop()
store = null
......@@ -473,34 +474,22 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
val origTimeoutOpt = conf.getOption("spark.network.timeout")
try {
conf.set("spark.network.timeout", "2s")
store = makeBlockManager(8000, "executor1")
store2 = makeBlockManager(8000, "executor2")
store3 = makeBlockManager(8000, "executor3")
val list1 = List(new Array[Byte](4000))
store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
var list1Get = store.getRemoteBytes("list1")
assert(list1Get.isDefined, "list1Get expected to be fetched")
// block manager exit
store2.stop()
store2 = null
list1Get = store.getRemoteBytes("list1")
// get `list1` block
assert(list1Get.isDefined, "list1Get expected to be fetched")
store3.stop()
store3 = null
// exception throw because there is no locations
intercept[BlockFetchException] {
list1Get = store.getRemoteBytes("list1")
}
} finally {
origTimeoutOpt match {
case Some(t) => conf.set("spark.network.timeout", t)
case None => conf.remove("spark.network.timeout")
}
conf.set("spark.shuffle.io.maxRetries", "0")
store = makeBlockManager(8000, "executor1")
store2 = makeBlockManager(8000, "executor2")
store3 = makeBlockManager(8000, "executor3")
val list1 = List(new Array[Byte](4000))
store2.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store3.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
store2.stop()
store2 = null
assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
store3.stop()
store3 = null
// exception throw because there is no locations
intercept[BlockFetchException] {
store.getRemoteBytes("list1")
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment