diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index f7323a4d9db72fb956956f5321f95a0fee669d95..9939103bb0903978669fe41acf9984b9033ce81d 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -150,6 +150,13 @@ private[spark] class ExecutorAllocationManager(
   // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
   val executorAllocationManagerSource = new ExecutorAllocationManagerSource
 
+  // Whether we are still waiting for the initial set of executors to be allocated.
+  // While this is true, we will not cancel outstanding executor requests. This is
+  // set to false when:
+  //   (1) a stage is submitted, or
+  //   (2) an executor idle timeout has elapsed.
+  @volatile private var initializing: Boolean = true
+
   /**
    * Verify that the settings specified through the config are valid.
    * If not, throw an appropriate exception.
@@ -240,6 +247,7 @@ private[spark] class ExecutorAllocationManager(
     removeTimes.retain { case (executorId, expireTime) =>
       val expired = now >= expireTime
       if (expired) {
+        initializing = false
         removeExecutor(executorId)
       }
       !expired
@@ -261,7 +269,11 @@ private[spark] class ExecutorAllocationManager(
   private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
     val maxNeeded = maxNumExecutorsNeeded
 
-    if (maxNeeded < numExecutorsTarget) {
+    if (initializing) {
+      // Do not change our target while we are still initializing,
+      // Otherwise the first job may have to ramp up unnecessarily
+      0
+    } else if (maxNeeded < numExecutorsTarget) {
       // The target number exceeds the number we actually need, so stop adding new
       // executors and inform the cluster manager to cancel the extra pending requests
       val oldNumExecutorsTarget = numExecutorsTarget
@@ -271,7 +283,7 @@ private[spark] class ExecutorAllocationManager(
       // If the new target has not changed, avoid sending a message to the cluster manager
       if (numExecutorsTarget < oldNumExecutorsTarget) {
         client.requestTotalExecutors(numExecutorsTarget)
-        logInfo(s"Lowering target number of executors to $numExecutorsTarget (previously " +
+        logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
           s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
       }
       numExecutorsTarget - oldNumExecutorsTarget
@@ -481,6 +493,7 @@ private[spark] class ExecutorAllocationManager(
     private var numRunningTasks: Int = _
 
     override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
+      initializing = false
       val stageId = stageSubmitted.stageInfo.stageId
       val numTasks = stageSubmitted.stageInfo.numTasks
       allocationManager.synchronized {
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 1c2b681f0b8430ab25f2c9672e4319024d3c2f4a..803e1831bb26930b381f9a858e4709634dedf4fb 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -90,7 +90,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("add executors") {
-    sc = createSparkContext(1, 10)
+    sc = createSparkContext(1, 10, 1)
     val manager = sc.executorAllocationManager.get
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
@@ -135,7 +135,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("add executors capped by num pending tasks") {
-    sc = createSparkContext(0, 10)
+    sc = createSparkContext(0, 10, 0)
     val manager = sc.executorAllocationManager.get
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))
 
@@ -186,7 +186,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("cancel pending executors when no longer needed") {
-    sc = createSparkContext(0, 10)
+    sc = createSparkContext(0, 10, 0)
     val manager = sc.executorAllocationManager.get
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))
 
@@ -213,7 +213,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("remove executors") {
-    sc = createSparkContext(5, 10)
+    sc = createSparkContext(5, 10, 5)
     val manager = sc.executorAllocationManager.get
     (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
 
@@ -263,7 +263,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test ("interleaving add and remove") {
-    sc = createSparkContext(5, 10)
+    sc = createSparkContext(5, 10, 5)
     val manager = sc.executorAllocationManager.get
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
@@ -331,7 +331,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("starting/canceling add timer") {
-    sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10, 2)
     val clock = new ManualClock(8888L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -363,7 +363,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("starting/canceling remove timers") {
-    sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10, 2)
     val clock = new ManualClock(14444L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -410,7 +410,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("mock polling loop with no events") {
-    sc = createSparkContext(0, 20)
+    sc = createSparkContext(0, 20, 0)
     val manager = sc.executorAllocationManager.get
     val clock = new ManualClock(2020L)
     manager.setClock(clock)
@@ -436,7 +436,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("mock polling loop add behavior") {
-    sc = createSparkContext(0, 20)
+    sc = createSparkContext(0, 20, 0)
     val clock = new ManualClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -486,7 +486,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("mock polling loop remove behavior") {
-    sc = createSparkContext(1, 20)
+    sc = createSparkContext(1, 20, 1)
     val clock = new ManualClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -547,7 +547,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("listeners trigger add executors correctly") {
-    sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10, 2)
     val manager = sc.executorAllocationManager.get
     assert(addTime(manager) === NOT_SET)
 
@@ -577,7 +577,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("listeners trigger remove executors correctly") {
-    sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10, 2)
     val manager = sc.executorAllocationManager.get
     assert(removeTimes(manager).isEmpty)
 
@@ -608,7 +608,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("listeners trigger add and remove executor callbacks correctly") {
-    sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10, 2)
     val manager = sc.executorAllocationManager.get
     assert(executorIds(manager).isEmpty)
     assert(removeTimes(manager).isEmpty)
@@ -641,7 +641,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("SPARK-4951: call onTaskStart before onBlockManagerAdded") {
-    sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10, 2)
     val manager = sc.executorAllocationManager.get
     assert(executorIds(manager).isEmpty)
     assert(removeTimes(manager).isEmpty)
@@ -677,7 +677,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("avoid ramp up when target < running executors") {
-    sc = createSparkContext(0, 100000)
+    sc = createSparkContext(0, 100000, 0)
     val manager = sc.executorAllocationManager.get
     val stage1 = createStageInfo(0, 1000)
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
@@ -701,13 +701,67 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsTarget(manager) === 16)
   }
 
-  private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
+  test("avoid ramp down initial executors until first job is submitted") {
+    sc = createSparkContext(2, 5, 3)
+    val manager = sc.executorAllocationManager.get
+    val clock = new ManualClock(10000L)
+    manager.setClock(clock)
+
+    // Verify the initial number of executors
+    assert(numExecutorsTarget(manager) === 3)
+    schedule(manager)
+    // Verify whether the initial number of executors is kept with no pending tasks
+    assert(numExecutorsTarget(manager) === 3)
+
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
+    clock.advance(100L)
+
+    assert(maxNumExecutorsNeeded(manager) === 2)
+    schedule(manager)
+
+    // Verify that current number of executors should be ramp down when first job is submitted
+    assert(numExecutorsTarget(manager) === 2)
+  }
+
+  test("avoid ramp down initial executors until idle executor is timeout") {
+    sc = createSparkContext(2, 5, 3)
+    val manager = sc.executorAllocationManager.get
+    val clock = new ManualClock(10000L)
+    manager.setClock(clock)
+
+    // Verify the initial number of executors
+    assert(numExecutorsTarget(manager) === 3)
+    schedule(manager)
+    // Verify the initial number of executors is kept when no pending tasks
+    assert(numExecutorsTarget(manager) === 3)
+    (0 until 3).foreach { i =>
+      onExecutorAdded(manager, s"executor-$i")
+    }
+
+    clock.advance(executorIdleTimeout * 1000)
+
+    assert(maxNumExecutorsNeeded(manager) === 0)
+    schedule(manager)
+    // Verify executor is timeout but numExecutorsTarget is not recalculated
+    assert(numExecutorsTarget(manager) === 3)
+
+    // Schedule again to recalculate the numExecutorsTarget after executor is timeout
+    schedule(manager)
+    // Verify that current number of executors should be ramp down when executor is timeout
+    assert(numExecutorsTarget(manager) === 2)
+  }
+
+  private def createSparkContext(
+      minExecutors: Int = 1,
+      maxExecutors: Int = 5,
+      initialExecutors: Int = 1): SparkContext = {
     val conf = new SparkConf()
       .setMaster("local")
       .setAppName("test-executor-allocation-manager")
       .set("spark.dynamicAllocation.enabled", "true")
       .set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
       .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
+      .set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)
       .set("spark.dynamicAllocation.schedulerBacklogTimeout",
           s"${schedulerBacklogTimeout.toString}s")
       .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
@@ -791,6 +845,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
     manager invokePrivate _schedule()
   }
 
+  private def maxNumExecutorsNeeded(manager: ExecutorAllocationManager): Int = {
+    manager invokePrivate _maxNumExecutorsNeeded()
+  }
+
   private def addExecutors(manager: ExecutorAllocationManager): Int = {
     val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded()
     manager invokePrivate _addExecutors(maxNumExecutorsNeeded)