diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 1cd68a2aa61ddcc874fd0245dd5b0eb52f4f3479..dda25463c7208c58a3224ccc9369cc6676728d46 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -33,6 +33,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor val waitingApps = new ArrayBuffer[ApplicationInfo] val completedApps = new ArrayBuffer[ApplicationInfo] + var firstApp: Option[ApplicationInfo] = None + val masterPublicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else ip @@ -167,7 +169,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app, _)).sortBy(_.coresFree).reverse + .filter(canUse(app, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) @@ -190,7 +192,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } } else { // Pack each app into as few nodes as possible until we've assigned all its cores - for (worker <- workers if worker.coresFree > 0) { + for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0) { if (canUse(app, worker)) { val coresToUse = math.min(worker.coresFree, app.coresLeft) @@ -203,6 +205,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } } } + if (workers.toArray.filter(_.state == WorkerState.ALIVE).size > 0 && + firstApp != None && firstApp.get.executors.size == 0) { + logWarning("Could not find any machines with enough memory. Ensure that SPARK_WORKER_MEM > SPARK_MEM.") + } } def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { @@ -245,6 +251,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor idToApp(app.id) = app actorToApp(driver) = app addressToApp(driver.path.address) = app + if (firstApp == None) { + firstApp = Some(app) + } return app } diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 1e4fbdb8742fdac8d33edb6b1e9ec1b3aaaea4d4..04d01e9ce8d363ce5f28a49836f54b3beb038f3d 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -11,6 +11,7 @@ import spark.TaskState.TaskState import spark.scheduler._ import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLong +import java.util.{TimerTask, Timer} /** * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call @@ -22,6 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // How often to check for speculative tasks val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong + // Threshold above which we warn user initial TaskSet may be starved + val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "5000").toLong val activeTaskSets = new HashMap[String, TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] @@ -30,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val taskIdToExecutorId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] + var hasReceivedTask = false + var hasLaunchedTask = false + val starvationTimer = new Timer(true) + // Incrementing Mesos task IDs val nextTaskId = new AtomicLong(0) @@ -94,6 +101,18 @@ private[spark] class ClusterScheduler(val sc: SparkContext) activeTaskSets(taskSet.id) = manager activeTaskSetsQueue += manager taskSetTaskIds(taskSet.id) = new HashSet[Long]() + + if (hasReceivedTask == false) { + starvationTimer.scheduleAtFixedRate(new TimerTask() { + override def run() { + if (!hasLaunchedTask) { + logWarning("Initial TaskSet has not accepted any offers. " + + "Check the scheduler UI to ensure slaves are registered.") + } + } + }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) + } + hasReceivedTask = true; } backend.reviveOffers() } @@ -150,6 +169,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } while (launchedTask) } + if (tasks.size > 0) hasLaunchedTask = true return tasks } } @@ -235,7 +255,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } override def defaultParallelism() = backend.defaultParallelism() - + // Check for speculatable tasks in all our active jobs. def checkSpeculatableTasks() { var shouldRevive = false