From 3f80bc841ab155925fb0530eef5927990f4a5793 Mon Sep 17 00:00:00 2001
From: jerryshao <saisai.shao@intel.com>
Date: Fri, 5 Jun 2015 12:28:37 -0700
Subject: [PATCH] [SPARK-7699] [CORE] Lazy start the scheduler for dynamic
 allocation

This patch propose to lazy start the scheduler for dynamic allocation to avoid fast ramp down executor numbers is load is less.

This implementation will:
1. immediately start the scheduler is `numExecutorsTarget` is 0, this is the expected behavior.
2. if `numExecutorsTarget` is not zero, start the scheduler until the number is satisfied, if the load is less, this initial started executors will last for at least 60 seconds, user will have a window to submit a job, no need to revamp the executors.
3. if `numExecutorsTarget` is not satisfied until the timeout, this means resource is not enough, the scheduler will start until this timeout, will not wait infinitely.

Please help to review, thanks a lot.

Author: jerryshao <saisai.shao@intel.com>

Closes #6430 from jerryshao/SPARK-7699 and squashes the following commits:

02cac8e [jerryshao] Address the comments
7242450 [jerryshao] Remove the useless import
ecc0b00 [jerryshao] Address the comments
6f75f00 [jerryshao] Style changes
8b8decc [jerryshao] change the test name
fb822ca [jerryshao] Change the solution according to comments
1cc74e5 [jerryshao] Lazy start the scheduler for dynamic allocation
---
 .../spark/ExecutorAllocationManager.scala     | 17 +++-
 .../ExecutorAllocationManagerSuite.scala      | 90 +++++++++++++++----
 2 files changed, 89 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index f7323a4d9d..9939103bb0 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -150,6 +150,13 @@ private[spark] class ExecutorAllocationManager(
   // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
   val executorAllocationManagerSource = new ExecutorAllocationManagerSource
 
+  // Whether we are still waiting for the initial set of executors to be allocated.
+  // While this is true, we will not cancel outstanding executor requests. This is
+  // set to false when:
+  //   (1) a stage is submitted, or
+  //   (2) an executor idle timeout has elapsed.
+  @volatile private var initializing: Boolean = true
+
   /**
    * Verify that the settings specified through the config are valid.
    * If not, throw an appropriate exception.
@@ -240,6 +247,7 @@ private[spark] class ExecutorAllocationManager(
     removeTimes.retain { case (executorId, expireTime) =>
       val expired = now >= expireTime
       if (expired) {
+        initializing = false
         removeExecutor(executorId)
       }
       !expired
@@ -261,7 +269,11 @@ private[spark] class ExecutorAllocationManager(
   private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
     val maxNeeded = maxNumExecutorsNeeded
 
-    if (maxNeeded < numExecutorsTarget) {
+    if (initializing) {
+      // Do not change our target while we are still initializing,
+      // Otherwise the first job may have to ramp up unnecessarily
+      0
+    } else if (maxNeeded < numExecutorsTarget) {
       // The target number exceeds the number we actually need, so stop adding new
       // executors and inform the cluster manager to cancel the extra pending requests
       val oldNumExecutorsTarget = numExecutorsTarget
@@ -271,7 +283,7 @@ private[spark] class ExecutorAllocationManager(
       // If the new target has not changed, avoid sending a message to the cluster manager
       if (numExecutorsTarget < oldNumExecutorsTarget) {
         client.requestTotalExecutors(numExecutorsTarget)
-        logInfo(s"Lowering target number of executors to $numExecutorsTarget (previously " +
+        logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
           s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
       }
       numExecutorsTarget - oldNumExecutorsTarget
@@ -481,6 +493,7 @@ private[spark] class ExecutorAllocationManager(
     private var numRunningTasks: Int = _
 
     override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
+      initializing = false
       val stageId = stageSubmitted.stageInfo.stageId
       val numTasks = stageSubmitted.stageInfo.numTasks
       allocationManager.synchronized {
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 1c2b681f0b..803e1831bb 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -90,7 +90,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("add executors") {
-    sc = createSparkContext(1, 10)
+    sc = createSparkContext(1, 10, 1)
     val manager = sc.executorAllocationManager.get
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
@@ -135,7 +135,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("add executors capped by num pending tasks") {
-    sc = createSparkContext(0, 10)
+    sc = createSparkContext(0, 10, 0)
     val manager = sc.executorAllocationManager.get
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))
 
@@ -186,7 +186,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("cancel pending executors when no longer needed") {
-    sc = createSparkContext(0, 10)
+    sc = createSparkContext(0, 10, 0)
     val manager = sc.executorAllocationManager.get
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))
 
@@ -213,7 +213,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("remove executors") {
-    sc = createSparkContext(5, 10)
+    sc = createSparkContext(5, 10, 5)
     val manager = sc.executorAllocationManager.get
     (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
 
@@ -263,7 +263,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test ("interleaving add and remove") {
-    sc = createSparkContext(5, 10)
+    sc = createSparkContext(5, 10, 5)
     val manager = sc.executorAllocationManager.get
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
@@ -331,7 +331,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("starting/canceling add timer") {
-    sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10, 2)
     val clock = new ManualClock(8888L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -363,7 +363,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("starting/canceling remove timers") {
-    sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10, 2)
     val clock = new ManualClock(14444L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -410,7 +410,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("mock polling loop with no events") {
-    sc = createSparkContext(0, 20)
+    sc = createSparkContext(0, 20, 0)
     val manager = sc.executorAllocationManager.get
     val clock = new ManualClock(2020L)
     manager.setClock(clock)
@@ -436,7 +436,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("mock polling loop add behavior") {
-    sc = createSparkContext(0, 20)
+    sc = createSparkContext(0, 20, 0)
     val clock = new ManualClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -486,7 +486,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("mock polling loop remove behavior") {
-    sc = createSparkContext(1, 20)
+    sc = createSparkContext(1, 20, 1)
     val clock = new ManualClock(2020L)
     val manager = sc.executorAllocationManager.get
     manager.setClock(clock)
@@ -547,7 +547,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("listeners trigger add executors correctly") {
-    sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10, 2)
     val manager = sc.executorAllocationManager.get
     assert(addTime(manager) === NOT_SET)
 
@@ -577,7 +577,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("listeners trigger remove executors correctly") {
-    sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10, 2)
     val manager = sc.executorAllocationManager.get
     assert(removeTimes(manager).isEmpty)
 
@@ -608,7 +608,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("listeners trigger add and remove executor callbacks correctly") {
-    sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10, 2)
     val manager = sc.executorAllocationManager.get
     assert(executorIds(manager).isEmpty)
     assert(removeTimes(manager).isEmpty)
@@ -641,7 +641,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("SPARK-4951: call onTaskStart before onBlockManagerAdded") {
-    sc = createSparkContext(2, 10)
+    sc = createSparkContext(2, 10, 2)
     val manager = sc.executorAllocationManager.get
     assert(executorIds(manager).isEmpty)
     assert(removeTimes(manager).isEmpty)
@@ -677,7 +677,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("avoid ramp up when target < running executors") {
-    sc = createSparkContext(0, 100000)
+    sc = createSparkContext(0, 100000, 0)
     val manager = sc.executorAllocationManager.get
     val stage1 = createStageInfo(0, 1000)
     sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
@@ -701,13 +701,67 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsTarget(manager) === 16)
   }
 
-  private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
+  test("avoid ramp down initial executors until first job is submitted") {
+    sc = createSparkContext(2, 5, 3)
+    val manager = sc.executorAllocationManager.get
+    val clock = new ManualClock(10000L)
+    manager.setClock(clock)
+
+    // Verify the initial number of executors
+    assert(numExecutorsTarget(manager) === 3)
+    schedule(manager)
+    // Verify whether the initial number of executors is kept with no pending tasks
+    assert(numExecutorsTarget(manager) === 3)
+
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
+    clock.advance(100L)
+
+    assert(maxNumExecutorsNeeded(manager) === 2)
+    schedule(manager)
+
+    // Verify that current number of executors should be ramp down when first job is submitted
+    assert(numExecutorsTarget(manager) === 2)
+  }
+
+  test("avoid ramp down initial executors until idle executor is timeout") {
+    sc = createSparkContext(2, 5, 3)
+    val manager = sc.executorAllocationManager.get
+    val clock = new ManualClock(10000L)
+    manager.setClock(clock)
+
+    // Verify the initial number of executors
+    assert(numExecutorsTarget(manager) === 3)
+    schedule(manager)
+    // Verify the initial number of executors is kept when no pending tasks
+    assert(numExecutorsTarget(manager) === 3)
+    (0 until 3).foreach { i =>
+      onExecutorAdded(manager, s"executor-$i")
+    }
+
+    clock.advance(executorIdleTimeout * 1000)
+
+    assert(maxNumExecutorsNeeded(manager) === 0)
+    schedule(manager)
+    // Verify executor is timeout but numExecutorsTarget is not recalculated
+    assert(numExecutorsTarget(manager) === 3)
+
+    // Schedule again to recalculate the numExecutorsTarget after executor is timeout
+    schedule(manager)
+    // Verify that current number of executors should be ramp down when executor is timeout
+    assert(numExecutorsTarget(manager) === 2)
+  }
+
+  private def createSparkContext(
+      minExecutors: Int = 1,
+      maxExecutors: Int = 5,
+      initialExecutors: Int = 1): SparkContext = {
     val conf = new SparkConf()
       .setMaster("local")
       .setAppName("test-executor-allocation-manager")
       .set("spark.dynamicAllocation.enabled", "true")
       .set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
       .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
+      .set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)
       .set("spark.dynamicAllocation.schedulerBacklogTimeout",
           s"${schedulerBacklogTimeout.toString}s")
       .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
@@ -791,6 +845,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
     manager invokePrivate _schedule()
   }
 
+  private def maxNumExecutorsNeeded(manager: ExecutorAllocationManager): Int = {
+    manager invokePrivate _maxNumExecutorsNeeded()
+  }
+
   private def addExecutors(manager: ExecutorAllocationManager): Int = {
     val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded()
     manager invokePrivate _addExecutors(maxNumExecutorsNeeded)
-- 
GitLab