diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index c3132abd7a90e9ca2970c320cf9bc9b3564fd6f7..013671c1c8d76af434c2974bf4e7a08753e5c858 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -112,7 +112,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor masterActor ! ReviveOffers } - def defaultParallelism(): Int = totalCoreCount.get() + def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2) } object StandaloneSchedulerBackend { diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 62a0c5589c5bf04b04c71b34a7f11d8a494935a4..31784985dcbead81833970ed484cf9b009f04287 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -41,6 +41,8 @@ class CoarseMesosSchedulerBackend( "SPARK_JAVA_OPTS" ) + val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures + // Memory used by each executor (in megabytes) val executorMemory = { if (System.getenv("SPARK_MEM") != null) { @@ -67,6 +69,9 @@ class CoarseMesosSchedulerBackend( val slaveIdsWithExecutors = new HashSet[String] + val taskIdToSlaveId = new HashMap[Int, String] + val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed + val sparkHome = sc.getSparkHome() match { case Some(path) => path @@ -161,10 +166,14 @@ class CoarseMesosSchedulerBackend( val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 && + failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && !slaveIdsWithExecutors.contains(slaveId)) { // Launch an executor on the slave val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) val taskId = newMesosTaskId() + taskIdToSlaveId(taskId) = slaveId + slaveIdsWithExecutors += slaveId + coresByTaskId(taskId) = cpusToUse val task = MesosTaskInfo.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) .setSlaveId(offer.getSlaveId) @@ -210,15 +219,27 @@ class CoarseMesosSchedulerBackend( override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { val taskId = status.getTaskId.getValue.toInt - logInfo("Mesos task " + taskId + " is now " + status.getState) + val state = status.getState + logInfo("Mesos task " + taskId + " is now " + state) synchronized { - if (isFinished(status.getState)) { + if (isFinished(state)) { + val slaveId = taskIdToSlaveId(taskId) + slaveIdsWithExecutors -= slaveId + taskIdToSlaveId -= taskId // Remove the cores we have remembered for this task, if it's in the hashmap for (cores <- coresByTaskId.get(taskId)) { totalCoresAcquired -= cores coresByTaskId -= taskId - driver.reviveOffers() // In case we'd rejected everything before but have now lost a node } + // If it was a failure, mark the slave as failed for blacklisting purposes + if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) { + failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 + if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { + logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " + + "is Spark installed on it?") + } + } + driver.reviveOffers() // In case we'd rejected everything before but have now lost a node } } }