Skip to content
Snippets Groups Projects
Commit 4120bcba authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-13162] Standalone mode does not respect initial executors

Currently the Master would always set an application's initial executor limit to infinity. If the user specified `spark.dynamicAllocation.initialExecutors`, the config would not take effect. This is similar to #11047 but for standalone mode.

Author: Andrew Or <andrew@databricks.com>

Closes #11054 from andrewor14/standalone-da-initial.
parent 62a7c283
No related branches found
No related tags found
No related merge requests found
...@@ -231,6 +231,8 @@ private[spark] class ExecutorAllocationManager( ...@@ -231,6 +231,8 @@ private[spark] class ExecutorAllocationManager(
} }
} }
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
} }
/** /**
......
...@@ -29,6 +29,9 @@ private[spark] case class ApplicationDescription( ...@@ -29,6 +29,9 @@ private[spark] case class ApplicationDescription(
// short name of compression codec used when writing event logs, if any (e.g. lzf) // short name of compression codec used when writing event logs, if any (e.g. lzf)
eventLogCodec: Option[String] = None, eventLogCodec: Option[String] = None,
coresPerExecutor: Option[Int] = None, coresPerExecutor: Option[Int] = None,
// number of executors this application wants to start with,
// only used if dynamic allocation is enabled
initialExecutorLimit: Option[Int] = None,
user: String = System.getProperty("user.name", "<unknown>")) { user: String = System.getProperty("user.name", "<unknown>")) {
override def toString: String = "ApplicationDescription(" + name + ")" override def toString: String = "ApplicationDescription(" + name + ")"
......
...@@ -65,7 +65,7 @@ private[spark] class ApplicationInfo( ...@@ -65,7 +65,7 @@ private[spark] class ApplicationInfo(
appSource = new ApplicationSource(this) appSource = new ApplicationSource(this)
nextExecutorId = 0 nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorDesc] removedExecutors = new ArrayBuffer[ExecutorDesc]
executorLimit = Integer.MAX_VALUE executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
appUIUrlAtHistoryServer = None appUIUrlAtHistoryServer = None
} }
......
...@@ -19,11 +19,11 @@ package org.apache.spark.scheduler.cluster ...@@ -19,11 +19,11 @@ package org.apache.spark.scheduler.cluster
import java.util.concurrent.Semaphore import java.util.concurrent.Semaphore
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress} import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler._ import org.apache.spark.scheduler._
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
...@@ -89,8 +89,16 @@ private[spark] class SparkDeploySchedulerBackend( ...@@ -89,8 +89,16 @@ private[spark] class SparkDeploySchedulerBackend(
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, // If we're using dynamic allocation, set our initial executor limit to 0 for now.
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) // ExecutorAllocationManager will send the real initial limit to the Master later.
val initialExecutorLimit =
if (Utils.isDynamicAllocationEnabled(conf)) {
Some(0)
} else {
None
}
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start() client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED) launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
......
...@@ -447,7 +447,23 @@ class StandaloneDynamicAllocationSuite ...@@ -447,7 +447,23 @@ class StandaloneDynamicAllocationSuite
apps = getApplications() apps = getApplications()
// kill executor successfully // kill executor successfully
assert(apps.head.executors.size === 1) assert(apps.head.executors.size === 1)
}
test("initial executor limit") {
val initialExecutorLimit = 1
val myConf = appConf
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.shuffle.service.enabled", "true")
.set("spark.dynamicAllocation.initialExecutors", initialExecutorLimit.toString)
sc = new SparkContext(myConf)
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === initialExecutorLimit)
assert(apps.head.getExecutorLimit === initialExecutorLimit)
}
} }
// =============================== // ===============================
...@@ -540,7 +556,6 @@ class StandaloneDynamicAllocationSuite ...@@ -540,7 +556,6 @@ class StandaloneDynamicAllocationSuite
val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted
missingExecutors.foreach { id => missingExecutors.foreach { id =>
// Fake an executor registration so the driver knows about us // Fake an executor registration so the driver knows about us
val port = System.currentTimeMillis % 65536
val endpointRef = mock(classOf[RpcEndpointRef]) val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress]) val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress) when(endpointRef.address).thenReturn(mockAddress)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment