From dba95ea03216e6b8e623db4a36e1018c6ed95538 Mon Sep 17 00:00:00 2001 From: zsxwing <zsxwing@gmail.com> Date: Tue, 29 Sep 2015 11:53:28 -0700 Subject: [PATCH] [SPARK-10825] [CORE] [TESTS] Fix race conditions in StandaloneDynamicAllocationSuite Fix the following issues in StandaloneDynamicAllocationSuite: 1. It should not assume master and workers start in order 2. It should not assume master and workers get ready at once 3. It should not assume the application is already registered with master after creating SparkContext 4. It should not access Master.app and idToApp which are not thread safe The changes includes: * Use `eventually` to wait until master and workers are ready to fix 1 and 2 * Use `eventually` to wait until the application is registered with master to fix 3 * Use `askWithRetry[MasterStateResponse](RequestMasterState)` to get the application info to fix 4 Author: zsxwing <zsxwing@gmail.com> Closes #8914 from zsxwing/fix-StandaloneDynamicAllocationSuite. --- .../StandaloneDynamicAllocationSuite.scala | 305 +++++++++++------- 1 file changed, 192 insertions(+), 113 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 1f2a0f0d30..2e2fa22eb4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -17,10 +17,15 @@ package org.apache.spark.deploy +import scala.concurrent.duration._ + import org.mockito.Mockito.{mock, when} import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +import org.apache.spark.deploy.master.ApplicationInfo import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} @@ -56,6 +61,10 @@ class StandaloneDynamicAllocationSuite } master = makeMaster() workers = makeWorkers(10, 2048) + // Wait until all workers register with master successfully + eventually(timeout(60.seconds), interval(10.millis)) { + assert(getMasterState.workers.size === numWorkers) + } } override def afterAll(): Unit = { @@ -73,167 +82,208 @@ class StandaloneDynamicAllocationSuite test("dynamic allocation default behavior") { sc = new SparkContext(appConf) val appId = sc.applicationId - assert(master.apps.size === 1) - assert(master.apps.head.id === appId) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === Int.MaxValue) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } // kill all executors assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request 1 assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 1) // request 1 more assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === 2) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === 2) // request 1 more; this one won't go through assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === 3) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === 3) // kill all existing executors; we should end up with 3 - 2 = 1 executor assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 1) // kill all executors again; this time we'll have 1 - 1 = 0 executors left assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request many more; this increases the limit well beyond the cluster capacity assert(sc.requestExecutors(1000)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === 1000) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === 1000) } test("dynamic allocation with max cores <= cores per worker") { sc = new SparkContext(appConf.set("spark.cores.max", "8")) val appId = sc.applicationId - assert(master.apps.size === 1) - assert(master.apps.head.id === appId) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) - assert(master.apps.head.getExecutorLimit === Int.MaxValue) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } // kill all executors assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request 1 assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.executors.values.head.cores === 8) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.executors.values.head.cores === 8) + assert(apps.head.getExecutorLimit === 1) // request 1 more; this one won't go through because we're already at max cores. // This highlights a limitation of using dynamic allocation with max cores WITHOUT // setting cores per executor: once an application scales down and then scales back // up, its executors may not be spread out anymore! assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 2) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 2) // request 1 more; this one also won't go through for the same reason assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 3) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 3) // kill all existing executors; we should end up with 3 - 1 = 2 executor // Note: we scheduled these executors together, so their cores should be evenly distributed assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) - assert(master.apps.head.getExecutorLimit === 2) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) + assert(apps.head.getExecutorLimit === 2) // kill all executors again; this time we'll have 1 - 1 = 0 executors left assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request many more; this increases the limit well beyond the cluster capacity assert(sc.requestExecutors(1000)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) - assert(master.apps.head.getExecutorLimit === 1000) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4)) + assert(apps.head.getExecutorLimit === 1000) } test("dynamic allocation with max cores > cores per worker") { sc = new SparkContext(appConf.set("spark.cores.max", "16")) val appId = sc.applicationId - assert(master.apps.size === 1) - assert(master.apps.head.id === appId) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8)) - assert(master.apps.head.getExecutorLimit === Int.MaxValue) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8)) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } // kill all executors assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request 1 assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.executors.values.head.cores === 10) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.executors.values.head.cores === 10) + assert(apps.head.getExecutorLimit === 1) // request 1 more // Note: the cores are not evenly distributed because we scheduled these executors 1 by 1 assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.executors.values.map(_.cores).toSet === Set(10, 6)) - assert(master.apps.head.getExecutorLimit === 2) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.executors.values.map(_.cores).toSet === Set(10, 6)) + assert(apps.head.getExecutorLimit === 2) // request 1 more; this one won't go through assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === 3) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === 3) // kill all existing executors; we should end up with 3 - 2 = 1 executor assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.executors.values.head.cores === 10) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.executors.values.head.cores === 10) + assert(apps.head.getExecutorLimit === 1) // kill all executors again; this time we'll have 1 - 1 = 0 executors left assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request many more; this increases the limit well beyond the cluster capacity assert(sc.requestExecutors(1000)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8)) - assert(master.apps.head.getExecutorLimit === 1000) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8)) + assert(apps.head.getExecutorLimit === 1000) } test("dynamic allocation with cores per executor") { sc = new SparkContext(appConf.set("spark.executor.cores", "2")) val appId = sc.applicationId - assert(master.apps.size === 1) - assert(master.apps.head.id === appId) - assert(master.apps.head.executors.size === 10) // 20 cores total - assert(master.apps.head.getExecutorLimit === Int.MaxValue) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 10) // 20 cores total + assert(apps.head.getExecutorLimit === Int.MaxValue) + } // kill all executors assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request 1 assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 1) // request 3 more assert(sc.requestExecutors(3)) - assert(master.apps.head.executors.size === 4) - assert(master.apps.head.getExecutorLimit === 4) + apps = getApplications() + assert(apps.head.executors.size === 4) + assert(apps.head.getExecutorLimit === 4) // request 10 more; only 6 will go through assert(sc.requestExecutors(10)) - assert(master.apps.head.executors.size === 10) - assert(master.apps.head.getExecutorLimit === 14) + apps = getApplications() + assert(apps.head.executors.size === 10) + assert(apps.head.getExecutorLimit === 14) // kill 2 executors; we should get 2 back immediately assert(killNExecutors(sc, 2)) - assert(master.apps.head.executors.size === 10) - assert(master.apps.head.getExecutorLimit === 12) + apps = getApplications() + assert(apps.head.executors.size === 10) + assert(apps.head.getExecutorLimit === 12) // kill 4 executors; we should end up with 12 - 4 = 8 executors assert(killNExecutors(sc, 4)) - assert(master.apps.head.executors.size === 8) - assert(master.apps.head.getExecutorLimit === 8) + apps = getApplications() + assert(apps.head.executors.size === 8) + assert(apps.head.getExecutorLimit === 8) // kill all executors; this time we'll have 8 - 8 = 0 executors left assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request many more; this increases the limit well beyond the cluster capacity assert(sc.requestExecutors(1000)) - assert(master.apps.head.executors.size === 10) - assert(master.apps.head.getExecutorLimit === 1000) + apps = getApplications() + assert(apps.head.executors.size === 10) + assert(apps.head.getExecutorLimit === 1000) } test("dynamic allocation with cores per executor AND max cores") { @@ -241,55 +291,70 @@ class StandaloneDynamicAllocationSuite .set("spark.executor.cores", "2") .set("spark.cores.max", "8")) val appId = sc.applicationId - assert(master.apps.size === 1) - assert(master.apps.head.id === appId) - assert(master.apps.head.executors.size === 4) // 8 cores total - assert(master.apps.head.getExecutorLimit === Int.MaxValue) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 4) // 8 cores total + assert(apps.head.getExecutorLimit === Int.MaxValue) + } // kill all executors assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request 1 assert(sc.requestExecutors(1)) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 1) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 1) // request 3 more assert(sc.requestExecutors(3)) - assert(master.apps.head.executors.size === 4) - assert(master.apps.head.getExecutorLimit === 4) + apps = getApplications() + assert(apps.head.executors.size === 4) + assert(apps.head.getExecutorLimit === 4) // request 10 more; none will go through assert(sc.requestExecutors(10)) - assert(master.apps.head.executors.size === 4) - assert(master.apps.head.getExecutorLimit === 14) + apps = getApplications() + assert(apps.head.executors.size === 4) + assert(apps.head.getExecutorLimit === 14) // kill all executors; 4 executors will be launched immediately assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 4) - assert(master.apps.head.getExecutorLimit === 10) + apps = getApplications() + assert(apps.head.executors.size === 4) + assert(apps.head.getExecutorLimit === 10) // ... and again assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 4) - assert(master.apps.head.getExecutorLimit === 6) + apps = getApplications() + assert(apps.head.executors.size === 4) + assert(apps.head.getExecutorLimit === 6) // ... and again; now we end up with 6 - 4 = 2 executors left assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === 2) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === 2) // ... and again; this time we have 2 - 2 = 0 executors left assert(killAllExecutors(sc)) - assert(master.apps.head.executors.size === 0) - assert(master.apps.head.getExecutorLimit === 0) + apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) // request many more; this increases the limit well beyond the cluster capacity assert(sc.requestExecutors(1000)) - assert(master.apps.head.executors.size === 4) - assert(master.apps.head.getExecutorLimit === 1000) + apps = getApplications() + assert(apps.head.executors.size === 4) + assert(apps.head.getExecutorLimit === 1000) } test("kill the same executor twice (SPARK-9795)") { sc = new SparkContext(appConf) val appId = sc.applicationId - assert(master.apps.size === 1) - assert(master.apps.head.id === appId) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === Int.MaxValue) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } // sync executors between the Master and the driver, needed because // the driver refuses to kill executors it does not know about syncExecutors(sc) @@ -298,9 +363,10 @@ class StandaloneDynamicAllocationSuite assert(executors.size === 2) assert(sc.killExecutor(executors.head)) assert(sc.killExecutor(executors.head)) - assert(master.apps.head.executors.size === 1) + val apps = getApplications() + assert(apps.head.executors.size === 1) // The limit should not be lowered twice - assert(master.apps.head.getExecutorLimit === 1) + assert(apps.head.getExecutorLimit === 1) } // =============================== @@ -333,6 +399,16 @@ class StandaloneDynamicAllocationSuite } } + /** Get the Master state */ + private def getMasterState: MasterStateResponse = { + master.self.askWithRetry[MasterStateResponse](RequestMasterState) + } + + /** Get the applictions that are active from Master */ + private def getApplications(): Seq[ApplicationInfo] = { + getMasterState.activeApps + } + /** Kill all executors belonging to this application. */ private def killAllExecutors(sc: SparkContext): Boolean = { killNExecutors(sc, Int.MaxValue) @@ -352,8 +428,11 @@ class StandaloneDynamicAllocationSuite * don't wait for executors to register. Otherwise the tests will take much longer to run. */ private def getExecutorIds(sc: SparkContext): Seq[String] = { - assert(master.idToApp.contains(sc.applicationId)) - master.idToApp(sc.applicationId).executors.keys.map(_.toString).toSeq + val app = getApplications().find(_.id == sc.applicationId) + assert(app.isDefined) + // Although executors is transient, master is in the same process so the message won't be + // serialized and it's safe here. + app.get.executors.keys.map(_.toString).toSeq } /** -- GitLab