Skip to content
Snippets Groups Projects
Commit f33662c1 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge remote-tracking branch 'pwendell/starvation-check'

Also fixed a bug where master was offering executors on dead workers

Conflicts:
	core/src/main/scala/spark/deploy/master/Master.scala
parents 7341de0d 2ed791fd
No related branches found
No related tags found
No related merge requests found
...@@ -33,6 +33,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -33,6 +33,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingApps = new ArrayBuffer[ApplicationInfo] val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo] val completedApps = new ArrayBuffer[ApplicationInfo]
var firstApp: Option[ApplicationInfo] = None
val masterPublicAddress = { val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS") val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else ip if (envVar != null) envVar else ip
...@@ -167,7 +169,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -167,7 +169,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
// Try to spread out each app among all the nodes, until it has all its cores // Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) { for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse .filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
...@@ -190,7 +192,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -190,7 +192,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
} }
} else { } else {
// Pack each app into as few nodes as possible until we've assigned all its cores // Pack each app into as few nodes as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0) { for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) { for (app <- waitingApps if app.coresLeft > 0) {
if (canUse(app, worker)) { if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft) val coresToUse = math.min(worker.coresFree, app.coresLeft)
...@@ -203,6 +205,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -203,6 +205,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
} }
} }
} }
if (workers.toArray.filter(_.state == WorkerState.ALIVE).size > 0 &&
firstApp != None && firstApp.get.executors.size == 0) {
logWarning("Could not find any machines with enough memory. Ensure that SPARK_WORKER_MEM > SPARK_MEM.")
}
} }
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
...@@ -245,6 +251,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -245,6 +251,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
idToApp(app.id) = app idToApp(app.id) = app
actorToApp(driver) = app actorToApp(driver) = app
addressToApp(driver.path.address) = app addressToApp(driver.path.address) = app
if (firstApp == None) {
firstApp = Some(app)
}
return app return app
} }
......
...@@ -11,6 +11,7 @@ import spark.TaskState.TaskState ...@@ -11,6 +11,7 @@ import spark.TaskState.TaskState
import spark.scheduler._ import spark.scheduler._
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
/** /**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
...@@ -22,6 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -22,6 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// How often to check for speculative tasks // How often to check for speculative tasks
val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "5000").toLong
val activeTaskSets = new HashMap[String, TaskSetManager] val activeTaskSets = new HashMap[String, TaskSetManager]
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
...@@ -30,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -30,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val taskIdToExecutorId = new HashMap[Long, String] val taskIdToExecutorId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]] val taskSetTaskIds = new HashMap[String, HashSet[Long]]
var hasReceivedTask = false
var hasLaunchedTask = false
val starvationTimer = new Timer(true)
// Incrementing Mesos task IDs // Incrementing Mesos task IDs
val nextTaskId = new AtomicLong(0) val nextTaskId = new AtomicLong(0)
...@@ -94,6 +101,18 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -94,6 +101,18 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
activeTaskSets(taskSet.id) = manager activeTaskSets(taskSet.id) = manager
activeTaskSetsQueue += manager activeTaskSetsQueue += manager
taskSetTaskIds(taskSet.id) = new HashSet[Long]() taskSetTaskIds(taskSet.id) = new HashSet[Long]()
if (hasReceivedTask == false) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial TaskSet has not accepted any offers. " +
"Check the scheduler UI to ensure slaves are registered.")
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
hasReceivedTask = true;
} }
backend.reviveOffers() backend.reviveOffers()
} }
...@@ -150,6 +169,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -150,6 +169,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
} }
} while (launchedTask) } while (launchedTask)
} }
if (tasks.size > 0) hasLaunchedTask = true
return tasks return tasks
} }
} }
...@@ -235,7 +255,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -235,7 +255,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
} }
override def defaultParallelism() = backend.defaultParallelism() override def defaultParallelism() = backend.defaultParallelism()
// Check for speculatable tasks in all our active jobs. // Check for speculatable tasks in all our active jobs.
def checkSpeculatableTasks() { def checkSpeculatableTasks() {
var shouldRevive = false var shouldRevive = false
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment