diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 1f2a0f0d309ceeccc6bb98e20fc5f406ba06bdf6..2e2fa22eb477220e34f15a63457b30f9fab2708d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -17,10 +17,15 @@ package org.apache.spark.deploy +import scala.concurrent.duration._ + import org.mockito.Mockito.{mock, when} import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +import org.apache.spark.deploy.master.ApplicationInfo import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} @@ -56,6 +61,10 @@ class StandaloneDynamicAllocationSuite } master = makeMaster() workers = makeWorkers(10, 2048) + // Wait until all workers register with master successfully + eventually(timeout(60.seconds), interval(10.millis)) { + assert(getMasterState.workers.size === numWorkers) + } } override def afterAll(): Unit = { @@ -73,167 +82,208 @@ class StandaloneDynamicAllocationSuite test("dynamic allocation default behavior") { sc = new SparkContext(appConf) val appId = sc.applicationId - assert(master.apps.size === 1) - assert(master.apps.head.id === appId) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === Int.MaxValue) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } // kill all executors assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request 1 assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 1) // request 1 more assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === 2) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === 2) // request 1 more; this one won't go through assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === 3) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === 3) // kill all existing executors; we should end up with 3 - 2 = 1 executor assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 1) // kill all executors again; this time we'll have 1 - 1 = 0 executors left assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request many more; this increases the limit well beyond the cluster capacity assert(sc.requestExecutors(1000)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === 1000) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === 1000) } test("dynamic allocation with max cores <= cores per worker") { sc = new SparkContext(appConf.set("spark.cores.max", "8")) val appId = sc.applicationId - assert(master.apps.size === 1) - assert(master.apps.head.id === appId) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) - assert(master.apps.head.getExecutorLimit === Int.MaxValue) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } // kill all executors assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request 1 assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.executors.values.head.cores === 8) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.executors.values.head.cores === 8) + assert(apps.head.getExecutorLimit === 1) // request 1 more; this one won't go through because we're already at max cores. // This highlights a limitation of using dynamic allocation with max cores WITHOUT // setting cores per executor: once an application scales down and then scales back // up, its executors may not be spread out anymore! assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 2) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 2) // request 1 more; this one also won't go through for the same reason assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 3) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 3) // kill all existing executors; we should end up with 3 - 1 = 2 executor // Note: we scheduled these executors together, so their cores should be evenly distributed assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) - assert(master.apps.head.getExecutorLimit === 2) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) + assert(apps.head.getExecutorLimit === 2) // kill all executors again; this time we'll have 1 - 1 = 0 executors left assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request many more; this increases the limit well beyond the cluster capacity assert(sc.requestExecutors(1000)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) - assert(master.apps.head.getExecutorLimit === 1000) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) + assert(apps.head.getExecutorLimit === 1000) } test("dynamic allocation with max cores > cores per worker") { sc = new SparkContext(appConf.set("spark.cores.max", "16")) val appId = sc.applicationId - assert(master.apps.size === 1) - assert(master.apps.head.id === appId) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8)) - assert(master.apps.head.getExecutorLimit === Int.MaxValue) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8)) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } // kill all executors assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request 1 assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.executors.values.head.cores === 10) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.executors.values.head.cores === 10) + assert(apps.head.getExecutorLimit === 1) // request 1 more // Note: the cores are not evenly distributed because we scheduled these executors 1 by 1 assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.executors.values.map(_.cores).toSet === Set(10, 6)) - assert(master.apps.head.getExecutorLimit === 2) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.executors.values.map(_.cores).toSet === Set(10, 6)) + assert(apps.head.getExecutorLimit === 2) // request 1 more; this one won't go through assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === 3) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === 3) // kill all existing executors; we should end up with 3 - 2 = 1 executor assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.executors.values.head.cores === 10) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.executors.values.head.cores === 10) + assert(apps.head.getExecutorLimit === 1) // kill all executors again; this time we'll have 1 - 1 = 0 executors left assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request many more; this increases the limit well beyond the cluster capacity assert(sc.requestExecutors(1000)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8)) - assert(master.apps.head.getExecutorLimit === 1000) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8)) + assert(apps.head.getExecutorLimit === 1000) } test("dynamic allocation with cores per executor") { sc = new SparkContext(appConf.set("spark.executor.cores", "2")) val appId = sc.applicationId - assert(master.apps.size === 1) - assert(master.apps.head.id === appId) - assert(master.apps.head.executors.size === 10) // 20 cores total - assert(master.apps.head.getExecutorLimit === Int.MaxValue) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 10) // 20 cores total + assert(apps.head.getExecutorLimit === Int.MaxValue) + } // kill all executors assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request 1 assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 1) // request 3 more assert(sc.requestExecutors(3)) - assert(master.apps.head.executors.size === 4) - assert(master.apps.head.getExecutorLimit === 4) + apps = getApplications() + assert(apps.head.executors.size === 4) + assert(apps.head.getExecutorLimit === 4) // request 10 more; only 6 will go through assert(sc.requestExecutors(10)) - assert(master.apps.head.executors.size === 10) - assert(master.apps.head.getExecutorLimit === 14) + apps = getApplications() + assert(apps.head.executors.size === 10) + assert(apps.head.getExecutorLimit === 14) // kill 2 executors; we should get 2 back immediately assert(killNExecutors(sc, 2)) - assert(master.apps.head.executors.size === 10) - assert(master.apps.head.getExecutorLimit === 12) + apps = getApplications() + assert(apps.head.executors.size === 10) + assert(apps.head.getExecutorLimit === 12) // kill 4 executors; we should end up with 12 - 4 = 8 executors assert(killNExecutors(sc, 4)) - assert(master.apps.head.executors.size === 8) - assert(master.apps.head.getExecutorLimit === 8) + apps = getApplications() + assert(apps.head.executors.size === 8) + assert(apps.head.getExecutorLimit === 8) // kill all executors; this time we'll have 8 - 8 = 0 executors left assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request many more; this increases the limit well beyond the cluster capacity assert(sc.requestExecutors(1000)) - assert(master.apps.head.executors.size === 10) - assert(master.apps.head.getExecutorLimit === 1000) + apps = getApplications() + assert(apps.head.executors.size === 10) + assert(apps.head.getExecutorLimit === 1000) } test("dynamic allocation with cores per executor AND max cores") { @@ -241,55 +291,70 @@ class StandaloneDynamicAllocationSuite .set("spark.executor.cores", "2") .set("spark.cores.max", "8")) val appId = sc.applicationId - assert(master.apps.size === 1) - assert(master.apps.head.id === appId) - assert(master.apps.head.executors.size === 4) // 8 cores total - assert(master.apps.head.getExecutorLimit === Int.MaxValue) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 4) // 8 cores total + assert(apps.head.getExecutorLimit === Int.MaxValue) + } // kill all executors assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request 1 assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 1) // request 3 more assert(sc.requestExecutors(3)) - assert(master.apps.head.executors.size === 4) - assert(master.apps.head.getExecutorLimit === 4) + apps = getApplications() + assert(apps.head.executors.size === 4) + assert(apps.head.getExecutorLimit === 4) // request 10 more; none will go through assert(sc.requestExecutors(10)) - assert(master.apps.head.executors.size === 4) - assert(master.apps.head.getExecutorLimit === 14) + apps = getApplications() + assert(apps.head.executors.size === 4) + assert(apps.head.getExecutorLimit === 14) // kill all executors; 4 executors will be launched immediately assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 4) - assert(master.apps.head.getExecutorLimit === 10) + apps = getApplications() + assert(apps.head.executors.size === 4) + assert(apps.head.getExecutorLimit === 10) // ... and again assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 4) - assert(master.apps.head.getExecutorLimit === 6) + apps = getApplications() + assert(apps.head.executors.size === 4) + assert(apps.head.getExecutorLimit === 6) // ... and again; now we end up with 6 - 4 = 2 executors left assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === 2) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === 2) // ... and again; this time we have 2 - 2 = 0 executors left assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request many more; this increases the limit well beyond the cluster capacity assert(sc.requestExecutors(1000)) - assert(master.apps.head.executors.size === 4) - assert(master.apps.head.getExecutorLimit === 1000) + apps = getApplications() + assert(apps.head.executors.size === 4) + assert(apps.head.getExecutorLimit === 1000) } test("kill the same executor twice (SPARK-9795)") { sc = new SparkContext(appConf) val appId = sc.applicationId - assert(master.apps.size === 1) - assert(master.apps.head.id === appId) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === Int.MaxValue) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } // sync executors between the Master and the driver, needed because // the driver refuses to kill executors it does not know about syncExecutors(sc) @@ -298,9 +363,10 @@ class StandaloneDynamicAllocationSuite assert(executors.size === 2) assert(sc.killExecutor(executors.head)) assert(sc.killExecutor(executors.head)) - assert(master.apps.head.executors.size === 1) + val apps = getApplications() + assert(apps.head.executors.size === 1) // The limit should not be lowered twice - assert(master.apps.head.getExecutorLimit === 1) + assert(apps.head.getExecutorLimit === 1) } // =============================== @@ -333,6 +399,16 @@ class StandaloneDynamicAllocationSuite } } + /** Get the Master state */ + private def getMasterState: MasterStateResponse = { + master.self.askWithRetry[MasterStateResponse](RequestMasterState) + } + + /** Get the applictions that are active from Master */ + private def getApplications(): Seq[ApplicationInfo] = { + getMasterState.activeApps + } + /** Kill all executors belonging to this application. */ private def killAllExecutors(sc: SparkContext): Boolean = { killNExecutors(sc, Int.MaxValue) @@ -352,8 +428,11 @@ class StandaloneDynamicAllocationSuite * don't wait for executors to register. Otherwise the tests will take much longer to run. */ private def getExecutorIds(sc: SparkContext): Seq[String] = { - assert(master.idToApp.contains(sc.applicationId)) - master.idToApp(sc.applicationId).executors.keys.map(_.toString).toSeq + val app = getApplications().find(_.id == sc.applicationId) + assert(app.isDefined) + // Although executors is transient, master is in the same process so the message won't be + // serialized and it's safe here. + app.get.executors.keys.map(_.toString).toSeq } /**