diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index dda25463c7208c58a3224ccc9369cc6676728d46..b7f167425f5a70b83a4924f1f9bf804487893fe1 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -205,10 +205,6 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } } } - if (workers.toArray.filter(_.state == WorkerState.ALIVE).size > 0 && - firstApp != None && firstApp.get.executors.size == 0) { - logWarning("Could not find any machines with enough memory. Ensure that SPARK_WORKER_MEM > SPARK_MEM.") - } } def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { @@ -254,6 +250,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor if (firstApp == None) { firstApp = Some(app) } + val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray + if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) { + logWarning("Could not find any workers with enough memory for " + firstApp.get.id) + } return app } diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 04d01e9ce8d363ce5f28a49836f54b3beb038f3d..d9c2f9517be50ecd6e9370b5f55a4a3032dae6ab 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -24,7 +24,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // How often to check for speculative tasks val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong // Threshold above which we warn user initial TaskSet may be starved - val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "5000").toLong + val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong val activeTaskSets = new HashMap[String, TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] @@ -106,8 +106,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { - logWarning("Initial TaskSet has not accepted any offers. " + - "Check the scheduler UI to ensure slaves are registered.") + logWarning("Initial job has not accepted any resources; " + + "check your cluster UI to ensure that workers are registered") + } else { + this.cancel() } } }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) @@ -169,7 +171,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } while (launchedTask) } - if (tasks.size > 0) hasLaunchedTask = true + if (tasks.size > 0) { + hasLaunchedTask = true + } return tasks } }