diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 3431fc13dcb4e1c2c2d2745dbc19b6f01ccfddd3..db143d7341ce498e7b295a2ae403e1351db21b02 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -231,6 +231,8 @@ private[spark] class ExecutorAllocationManager(
       }
     }
     executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
+
+    client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 78bbd5c03f4a621dd8d0b7f22426255843e031e4..c5c5c60923f4e13e4acaf1b5437efed1c3ba4f0b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -29,6 +29,9 @@ private[spark] case class ApplicationDescription(
     // short name of compression codec used when writing event logs, if any (e.g. lzf)
     eventLogCodec: Option[String] = None,
     coresPerExecutor: Option[Int] = None,
+    // number of executors this application wants to start with,
+    // only used if dynamic allocation is enabled
+    initialExecutorLimit: Option[Int] = None,
     user: String = System.getProperty("user.name", "<unknown>")) {
 
   override def toString: String = "ApplicationDescription(" + name + ")"
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 7e2cf956c7253d19aeedb3c90d45237199fadffe..4ffb5283e99a4dd53029947e298f7b383e018daa 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -65,7 +65,7 @@ private[spark] class ApplicationInfo(
     appSource = new ApplicationSource(this)
     nextExecutorId = 0
     removedExecutors = new ArrayBuffer[ExecutorDesc]
-    executorLimit = Integer.MAX_VALUE
+    executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
     appUIUrlAtHistoryServer = None
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 16f33163789abf97758d37b2b3ed47072b84715f..d209645610c128303ddfe140cc87abf4ab790611 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -19,11 +19,11 @@ package org.apache.spark.scheduler.cluster
 
 import java.util.concurrent.Semaphore
 
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
 import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{AppClient, AppClientListener}
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
-import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress}
+import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
@@ -89,8 +89,16 @@ private[spark] class SparkDeploySchedulerBackend(
       args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
     val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
     val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
-    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
-      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
+    // If we're using dynamic allocation, set our initial executor limit to 0 for now.
+    // ExecutorAllocationManager will send the real initial limit to the Master later.
+    val initialExecutorLimit =
+      if (Utils.isDynamicAllocationEnabled(conf)) {
+        Some(0)
+      } else {
+        None
+      }
+    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
+      appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
     client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
     client.start()
     launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index fdada0777f9a92aeae4ce87b599949fd4b76a6e8..b7ff5c9e8c0d3c37e1d292da16086b6f4ac0ec77 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -447,7 +447,23 @@ class StandaloneDynamicAllocationSuite
     apps = getApplications()
     // kill executor successfully
     assert(apps.head.executors.size === 1)
+  }
 
+  test("initial executor limit") {
+    val initialExecutorLimit = 1
+    val myConf = appConf
+      .set("spark.dynamicAllocation.enabled", "true")
+      .set("spark.shuffle.service.enabled", "true")
+      .set("spark.dynamicAllocation.initialExecutors", initialExecutorLimit.toString)
+    sc = new SparkContext(myConf)
+    val appId = sc.applicationId
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === initialExecutorLimit)
+      assert(apps.head.getExecutorLimit === initialExecutorLimit)
+    }
   }
 
   // ===============================
@@ -540,7 +556,6 @@ class StandaloneDynamicAllocationSuite
     val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted
     missingExecutors.foreach { id =>
       // Fake an executor registration so the driver knows about us
-      val port = System.currentTimeMillis % 65536
       val endpointRef = mock(classOf[RpcEndpointRef])
       val mockAddress = mock(classOf[RpcAddress])
       when(endpointRef.address).thenReturn(mockAddress)