Skip to content
Snippets Groups Projects
Commit dba95ea0 authored by zsxwing's avatar zsxwing Committed by Andrew Or
Browse files

[SPARK-10825] [CORE] [TESTS] Fix race conditions in StandaloneDynamicAllocationSuite

Fix the following issues in StandaloneDynamicAllocationSuite:

1. It should not assume master and workers start in order
2. It should not assume master and workers get ready at once
3. It should not assume the application is already registered with master after creating SparkContext
4. It should not access Master.app and idToApp which are not thread safe

The changes includes:
* Use `eventually` to wait until master and workers are ready to fix 1 and 2
* Use `eventually`  to wait until the application is registered with master to fix 3
* Use `askWithRetry[MasterStateResponse](RequestMasterState)` to get the application info to fix 4

Author: zsxwing <zsxwing@gmail.com>

Closes #8914 from zsxwing/fix-StandaloneDynamicAllocationSuite.
parent 9b9fe5f7
No related branches found
No related tags found
No related merge requests found
......@@ -17,10 +17,15 @@
package org.apache.spark.deploy
import scala.concurrent.duration._
import org.mockito.Mockito.{mock, when}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ApplicationInfo
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
......@@ -56,6 +61,10 @@ class StandaloneDynamicAllocationSuite
}
master = makeMaster()
workers = makeWorkers(10, 2048)
// Wait until all workers register with master successfully
eventually(timeout(60.seconds), interval(10.millis)) {
assert(getMasterState.workers.size === numWorkers)
}
}
override def afterAll(): Unit = {
......@@ -73,167 +82,208 @@ class StandaloneDynamicAllocationSuite
test("dynamic allocation default behavior") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
assert(master.apps.size === 1)
assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === Int.MaxValue)
}
// kill all executors
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
var apps = getApplications()
assert(apps.head.executors.size === 0)
assert(apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 1)
apps = getApplications()
assert(apps.head.executors.size === 1)
assert(apps.head.getExecutorLimit === 1)
// request 1 more
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === 2)
apps = getApplications()
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === 2)
// request 1 more; this one won't go through
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === 3)
apps = getApplications()
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === 3)
// kill all existing executors; we should end up with 3 - 2 = 1 executor
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 1)
apps = getApplications()
assert(apps.head.executors.size === 1)
assert(apps.head.getExecutorLimit === 1)
// kill all executors again; this time we'll have 1 - 1 = 0 executors left
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
apps = getApplications()
assert(apps.head.executors.size === 0)
assert(apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === 1000)
apps = getApplications()
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === 1000)
}
test("dynamic allocation with max cores <= cores per worker") {
sc = new SparkContext(appConf.set("spark.cores.max", "8"))
val appId = sc.applicationId
assert(master.apps.size === 1)
assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 2)
assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
assert(apps.head.getExecutorLimit === Int.MaxValue)
}
// kill all executors
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
var apps = getApplications()
assert(apps.head.executors.size === 0)
assert(apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.executors.values.head.cores === 8)
assert(master.apps.head.getExecutorLimit === 1)
apps = getApplications()
assert(apps.head.executors.size === 1)
assert(apps.head.executors.values.head.cores === 8)
assert(apps.head.getExecutorLimit === 1)
// request 1 more; this one won't go through because we're already at max cores.
// This highlights a limitation of using dynamic allocation with max cores WITHOUT
// setting cores per executor: once an application scales down and then scales back
// up, its executors may not be spread out anymore!
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 2)
apps = getApplications()
assert(apps.head.executors.size === 1)
assert(apps.head.getExecutorLimit === 2)
// request 1 more; this one also won't go through for the same reason
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 3)
apps = getApplications()
assert(apps.head.executors.size === 1)
assert(apps.head.getExecutorLimit === 3)
// kill all existing executors; we should end up with 3 - 1 = 2 executor
// Note: we scheduled these executors together, so their cores should be evenly distributed
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
assert(master.apps.head.getExecutorLimit === 2)
apps = getApplications()
assert(apps.head.executors.size === 2)
assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
assert(apps.head.getExecutorLimit === 2)
// kill all executors again; this time we'll have 1 - 1 = 0 executors left
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
apps = getApplications()
assert(apps.head.executors.size === 0)
assert(apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
assert(master.apps.head.getExecutorLimit === 1000)
apps = getApplications()
assert(apps.head.executors.size === 2)
assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
assert(apps.head.getExecutorLimit === 1000)
}
test("dynamic allocation with max cores > cores per worker") {
sc = new SparkContext(appConf.set("spark.cores.max", "16"))
val appId = sc.applicationId
assert(master.apps.size === 1)
assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 2)
assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
assert(apps.head.getExecutorLimit === Int.MaxValue)
}
// kill all executors
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
var apps = getApplications()
assert(apps.head.executors.size === 0)
assert(apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.executors.values.head.cores === 10)
assert(master.apps.head.getExecutorLimit === 1)
apps = getApplications()
assert(apps.head.executors.size === 1)
assert(apps.head.executors.values.head.cores === 10)
assert(apps.head.getExecutorLimit === 1)
// request 1 more
// Note: the cores are not evenly distributed because we scheduled these executors 1 by 1
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.executors.values.map(_.cores).toSet === Set(10, 6))
assert(master.apps.head.getExecutorLimit === 2)
apps = getApplications()
assert(apps.head.executors.size === 2)
assert(apps.head.executors.values.map(_.cores).toSet === Set(10, 6))
assert(apps.head.getExecutorLimit === 2)
// request 1 more; this one won't go through
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === 3)
apps = getApplications()
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === 3)
// kill all existing executors; we should end up with 3 - 2 = 1 executor
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.executors.values.head.cores === 10)
assert(master.apps.head.getExecutorLimit === 1)
apps = getApplications()
assert(apps.head.executors.size === 1)
assert(apps.head.executors.values.head.cores === 10)
assert(apps.head.getExecutorLimit === 1)
// kill all executors again; this time we'll have 1 - 1 = 0 executors left
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
apps = getApplications()
assert(apps.head.executors.size === 0)
assert(apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
assert(master.apps.head.getExecutorLimit === 1000)
apps = getApplications()
assert(apps.head.executors.size === 2)
assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
assert(apps.head.getExecutorLimit === 1000)
}
test("dynamic allocation with cores per executor") {
sc = new SparkContext(appConf.set("spark.executor.cores", "2"))
val appId = sc.applicationId
assert(master.apps.size === 1)
assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 10) // 20 cores total
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 10) // 20 cores total
assert(apps.head.getExecutorLimit === Int.MaxValue)
}
// kill all executors
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
var apps = getApplications()
assert(apps.head.executors.size === 0)
assert(apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 1)
apps = getApplications()
assert(apps.head.executors.size === 1)
assert(apps.head.getExecutorLimit === 1)
// request 3 more
assert(sc.requestExecutors(3))
assert(master.apps.head.executors.size === 4)
assert(master.apps.head.getExecutorLimit === 4)
apps = getApplications()
assert(apps.head.executors.size === 4)
assert(apps.head.getExecutorLimit === 4)
// request 10 more; only 6 will go through
assert(sc.requestExecutors(10))
assert(master.apps.head.executors.size === 10)
assert(master.apps.head.getExecutorLimit === 14)
apps = getApplications()
assert(apps.head.executors.size === 10)
assert(apps.head.getExecutorLimit === 14)
// kill 2 executors; we should get 2 back immediately
assert(killNExecutors(sc, 2))
assert(master.apps.head.executors.size === 10)
assert(master.apps.head.getExecutorLimit === 12)
apps = getApplications()
assert(apps.head.executors.size === 10)
assert(apps.head.getExecutorLimit === 12)
// kill 4 executors; we should end up with 12 - 4 = 8 executors
assert(killNExecutors(sc, 4))
assert(master.apps.head.executors.size === 8)
assert(master.apps.head.getExecutorLimit === 8)
apps = getApplications()
assert(apps.head.executors.size === 8)
assert(apps.head.getExecutorLimit === 8)
// kill all executors; this time we'll have 8 - 8 = 0 executors left
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
apps = getApplications()
assert(apps.head.executors.size === 0)
assert(apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
assert(master.apps.head.executors.size === 10)
assert(master.apps.head.getExecutorLimit === 1000)
apps = getApplications()
assert(apps.head.executors.size === 10)
assert(apps.head.getExecutorLimit === 1000)
}
test("dynamic allocation with cores per executor AND max cores") {
......@@ -241,55 +291,70 @@ class StandaloneDynamicAllocationSuite
.set("spark.executor.cores", "2")
.set("spark.cores.max", "8"))
val appId = sc.applicationId
assert(master.apps.size === 1)
assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 4) // 8 cores total
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 4) // 8 cores total
assert(apps.head.getExecutorLimit === Int.MaxValue)
}
// kill all executors
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
var apps = getApplications()
assert(apps.head.executors.size === 0)
assert(apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 1)
apps = getApplications()
assert(apps.head.executors.size === 1)
assert(apps.head.getExecutorLimit === 1)
// request 3 more
assert(sc.requestExecutors(3))
assert(master.apps.head.executors.size === 4)
assert(master.apps.head.getExecutorLimit === 4)
apps = getApplications()
assert(apps.head.executors.size === 4)
assert(apps.head.getExecutorLimit === 4)
// request 10 more; none will go through
assert(sc.requestExecutors(10))
assert(master.apps.head.executors.size === 4)
assert(master.apps.head.getExecutorLimit === 14)
apps = getApplications()
assert(apps.head.executors.size === 4)
assert(apps.head.getExecutorLimit === 14)
// kill all executors; 4 executors will be launched immediately
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 4)
assert(master.apps.head.getExecutorLimit === 10)
apps = getApplications()
assert(apps.head.executors.size === 4)
assert(apps.head.getExecutorLimit === 10)
// ... and again
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 4)
assert(master.apps.head.getExecutorLimit === 6)
apps = getApplications()
assert(apps.head.executors.size === 4)
assert(apps.head.getExecutorLimit === 6)
// ... and again; now we end up with 6 - 4 = 2 executors left
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === 2)
apps = getApplications()
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === 2)
// ... and again; this time we have 2 - 2 = 0 executors left
assert(killAllExecutors(sc))
assert(master.apps.head.executors.size === 0)
assert(master.apps.head.getExecutorLimit === 0)
apps = getApplications()
assert(apps.head.executors.size === 0)
assert(apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
assert(master.apps.head.executors.size === 4)
assert(master.apps.head.getExecutorLimit === 1000)
apps = getApplications()
assert(apps.head.executors.size === 4)
assert(apps.head.getExecutorLimit === 1000)
}
test("kill the same executor twice (SPARK-9795)") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
assert(master.apps.size === 1)
assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === Int.MaxValue)
}
// sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about
syncExecutors(sc)
......@@ -298,9 +363,10 @@ class StandaloneDynamicAllocationSuite
assert(executors.size === 2)
assert(sc.killExecutor(executors.head))
assert(sc.killExecutor(executors.head))
assert(master.apps.head.executors.size === 1)
val apps = getApplications()
assert(apps.head.executors.size === 1)
// The limit should not be lowered twice
assert(master.apps.head.getExecutorLimit === 1)
assert(apps.head.getExecutorLimit === 1)
}
// ===============================
......@@ -333,6 +399,16 @@ class StandaloneDynamicAllocationSuite
}
}
/** Get the Master state */
private def getMasterState: MasterStateResponse = {
master.self.askWithRetry[MasterStateResponse](RequestMasterState)
}
/** Get the applictions that are active from Master */
private def getApplications(): Seq[ApplicationInfo] = {
getMasterState.activeApps
}
/** Kill all executors belonging to this application. */
private def killAllExecutors(sc: SparkContext): Boolean = {
killNExecutors(sc, Int.MaxValue)
......@@ -352,8 +428,11 @@ class StandaloneDynamicAllocationSuite
* don't wait for executors to register. Otherwise the tests will take much longer to run.
*/
private def getExecutorIds(sc: SparkContext): Seq[String] = {
assert(master.idToApp.contains(sc.applicationId))
master.idToApp(sc.applicationId).executors.keys.map(_.toString).toSeq
val app = getApplications().find(_.id == sc.applicationId)
assert(app.isDefined)
// Although executors is transient, master is in the same process so the message won't be
// serialized and it's safe here.
app.get.executors.keys.map(_.toString).toSeq
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment