From 4120bcbaffe92da40486b469334119ed12199f4f Mon Sep 17 00:00:00 2001 From: Andrew Or <andrew@databricks.com> Date: Thu, 4 Feb 2016 10:32:16 -0800 Subject: [PATCH] [SPARK-13162] Standalone mode does not respect initial executors Currently the Master would always set an application's initial executor limit to infinity. If the user specified `spark.dynamicAllocation.initialExecutors`, the config would not take effect. This is similar to #11047 but for standalone mode. Author: Andrew Or <andrew@databricks.com> Closes #11054 from andrewor14/standalone-da-initial. --- .../spark/ExecutorAllocationManager.scala | 2 ++ .../spark/deploy/ApplicationDescription.scala | 3 +++ .../spark/deploy/master/ApplicationInfo.scala | 2 +- .../cluster/SparkDeploySchedulerBackend.scala | 16 ++++++++++++---- .../StandaloneDynamicAllocationSuite.scala | 17 ++++++++++++++++- 5 files changed, 34 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 3431fc13dc..db143d7341 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -231,6 +231,8 @@ private[spark] class ExecutorAllocationManager( } } executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) + + client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 78bbd5c03f..c5c5c60923 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -29,6 +29,9 @@ private[spark] case class ApplicationDescription( // short name of compression codec used when writing event logs, if any (e.g. lzf) eventLogCodec: Option[String] = None, coresPerExecutor: Option[Int] = None, + // number of executors this application wants to start with, + // only used if dynamic allocation is enabled + initialExecutorLimit: Option[Int] = None, user: String = System.getProperty("user.name", "<unknown>")) { override def toString: String = "ApplicationDescription(" + name + ")" diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 7e2cf956c7..4ffb5283e9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -65,7 +65,7 @@ private[spark] class ApplicationInfo( appSource = new ApplicationSource(this) nextExecutorId = 0 removedExecutors = new ArrayBuffer[ExecutorDesc] - executorLimit = Integer.MAX_VALUE + executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE) appUIUrlAtHistoryServer = None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 16f3316378..d209645610 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -19,11 +19,11 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.Semaphore -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} -import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress} +import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ import org.apache.spark.util.Utils @@ -89,8 +89,16 @@ private[spark] class SparkDeploySchedulerBackend( args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) - val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, - command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) + // If we're using dynamic allocation, set our initial executor limit to 0 for now. + // ExecutorAllocationManager will send the real initial limit to the Master later. + val initialExecutorLimit = + if (Utils.isDynamicAllocationEnabled(conf)) { + Some(0) + } else { + None + } + val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, + appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index fdada0777f..b7ff5c9e8c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -447,7 +447,23 @@ class StandaloneDynamicAllocationSuite apps = getApplications() // kill executor successfully assert(apps.head.executors.size === 1) + } + test("initial executor limit") { + val initialExecutorLimit = 1 + val myConf = appConf + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.shuffle.service.enabled", "true") + .set("spark.dynamicAllocation.initialExecutors", initialExecutorLimit.toString) + sc = new SparkContext(myConf) + val appId = sc.applicationId + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === initialExecutorLimit) + assert(apps.head.getExecutorLimit === initialExecutorLimit) + } } // =============================== @@ -540,7 +556,6 @@ class StandaloneDynamicAllocationSuite val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted missingExecutors.foreach { id => // Fake an executor registration so the driver knows about us - val port = System.currentTimeMillis % 65536 val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) -- GitLab