From cbc065039f5176acc49899462bfab2521da26701 Mon Sep 17 00:00:00 2001
From: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Date: Wed, 17 Sep 2014 16:23:50 -0700
Subject: [PATCH] [SPARK-3571] Spark standalone cluster mode doesn't work.

I think, this issue is caused by #1106

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #2436 from sarutak/SPARK-3571 and squashes the following commits:

7a4deea [Kousuke Saruta] Modified Master.scala to use numWorkersVisited and numWorkersAlive instead of stopPos
4e51e35 [Kousuke Saruta] Modified Master to prevent from 0 divide
4817ecd [Kousuke Saruta] Brushed up previous change
71e84b6 [Kousuke Saruta] Modified Master to enable schedule normally
---
 .../scala/org/apache/spark/deploy/master/Master.scala | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2a3bd6ba0b..432b552c58 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -489,23 +489,24 @@ private[spark] class Master(
     // First schedule drivers, they take strict precedence over applications
     // Randomization helps balance drivers
     val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
-    val aliveWorkerNum = shuffledAliveWorkers.size
+    val numWorkersAlive = shuffledAliveWorkers.size
     var curPos = 0
+    
     for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
       // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
       // start from the last worker that was assigned a driver, and continue onwards until we have
       // explored all alive workers.
-      curPos = (curPos + 1) % aliveWorkerNum
-      val startPos = curPos
       var launched = false
-      while (curPos != startPos && !launched) {
+      var numWorkersVisited = 0
+      while (numWorkersVisited < numWorkersAlive && !launched) {
         val worker = shuffledAliveWorkers(curPos)
+        numWorkersVisited += 1
         if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
           launchDriver(worker, driver)
           waitingDrivers -= driver
           launched = true
         }
-        curPos = (curPos + 1) % aliveWorkerNum
+        curPos = (curPos + 1) % numWorkersAlive
       }
     }
 
-- 
GitLab