From e2a67a802447c9a778d57b687dc6321f5fb14283 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Thu, 12 Jul 2012 18:21:52 -0700
Subject: [PATCH] Fixes to coarse-grained Mesos scheduler in dealing with
 failed nodes

---
 .../cluster/StandaloneSchedulerBackend.scala  |  2 +-
 .../mesos/CoarseMesosSchedulerBackend.scala   | 27 ++++++++++++++++---
 2 files changed, 25 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index c3132abd7a..013671c1c8 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -112,7 +112,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
     masterActor ! ReviveOffers
   }
 
-  def defaultParallelism(): Int = totalCoreCount.get()
+  def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
 }
 
 object StandaloneSchedulerBackend {
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 62a0c5589c..31784985dc 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -41,6 +41,8 @@ class CoarseMesosSchedulerBackend(
     "SPARK_JAVA_OPTS"
   )
 
+  val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
+
   // Memory used by each executor (in megabytes)
   val executorMemory = {
     if (System.getenv("SPARK_MEM") != null) {
@@ -67,6 +69,9 @@ class CoarseMesosSchedulerBackend(
 
   val slaveIdsWithExecutors = new HashSet[String]
 
+  val taskIdToSlaveId = new HashMap[Int, String]
+  val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
+
   val sparkHome = sc.getSparkHome() match {
     case Some(path) =>
       path
@@ -161,10 +166,14 @@ class CoarseMesosSchedulerBackend(
         val mem = getResource(offer.getResourcesList, "mem")
         val cpus = getResource(offer.getResourcesList, "cpus").toInt
         if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
+            failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
             !slaveIdsWithExecutors.contains(slaveId)) {
           // Launch an executor on the slave
           val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
           val taskId = newMesosTaskId()
+          taskIdToSlaveId(taskId) = slaveId
+          slaveIdsWithExecutors += slaveId
+          coresByTaskId(taskId) = cpusToUse
           val task = MesosTaskInfo.newBuilder()
             .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
             .setSlaveId(offer.getSlaveId)
@@ -210,15 +219,27 @@ class CoarseMesosSchedulerBackend(
 
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
     val taskId = status.getTaskId.getValue.toInt
-    logInfo("Mesos task " + taskId + " is now " + status.getState)
+    val state = status.getState
+    logInfo("Mesos task " + taskId + " is now " + state)
     synchronized {
-      if (isFinished(status.getState)) {
+      if (isFinished(state)) {
+        val slaveId = taskIdToSlaveId(taskId)
+        slaveIdsWithExecutors -= slaveId
+        taskIdToSlaveId -= taskId
         // Remove the cores we have remembered for this task, if it's in the hashmap
         for (cores <- coresByTaskId.get(taskId)) {
           totalCoresAcquired -= cores
           coresByTaskId -= taskId
-          driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
         }
+        // If it was a failure, mark the slave as failed for blacklisting purposes
+        if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
+          failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
+          if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
+            logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
+                "is Spark installed on it?")
+          }
+        }
+        driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
       }
     }
   }
-- 
GitLab