Skip to content
Snippets Groups Projects
Commit 1859c9f9 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Changing to use Timer based on code review

parent b1432295
No related branches found
No related tags found
No related merge requests found
...@@ -11,6 +11,7 @@ import spark.TaskState.TaskState ...@@ -11,6 +11,7 @@ import spark.TaskState.TaskState
import spark.scheduler._ import spark.scheduler._
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
/** /**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
...@@ -22,8 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -22,8 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// How often to check for speculative tasks // How often to check for speculative tasks
val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
// How often to check for starved TaskSets // Threshold above which we warn user initial TaskSet may be starved
val STARVATION_CHECK_INTERVAL = System.getProperty("spark.starvation_check.interval", "5000").toLong val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "5000").toLong
val activeTaskSets = new HashMap[String, TaskSetManager] val activeTaskSets = new HashMap[String, TaskSetManager]
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
...@@ -32,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -32,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val taskIdToExecutorId = new HashMap[Long, String] val taskIdToExecutorId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]] val taskSetTaskIds = new HashMap[String, HashSet[Long]]
var hasReceivedTask = false
var hasLaunchedTask = false
val starvationTimer = new Timer(true)
// Incrementing Mesos task IDs // Incrementing Mesos task IDs
val nextTaskId = new AtomicLong(0) val nextTaskId = new AtomicLong(0)
...@@ -86,21 +91,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -86,21 +91,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
} }
}.start() }.start()
} }
new Thread("ClusterScheduler starvation check") {
setDaemon(true)
override def run() {
while (true) {
try {
Thread.sleep(STARVATION_CHECK_INTERVAL)
} catch {
case e: InterruptedException => {}
}
detectStarvedTaskSets()
}
}
}.start()
} }
override def submitTasks(taskSet: TaskSet) { override def submitTasks(taskSet: TaskSet) {
...@@ -111,6 +101,18 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -111,6 +101,18 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
activeTaskSets(taskSet.id) = manager activeTaskSets(taskSet.id) = manager
activeTaskSetsQueue += manager activeTaskSetsQueue += manager
taskSetTaskIds(taskSet.id) = new HashSet[Long]() taskSetTaskIds(taskSet.id) = new HashSet[Long]()
if (hasReceivedTask == false) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial TaskSet has not accepted any offers. " +
"Check the scheduler UI to ensure slaves are registered.")
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
hasReceivedTask = true;
} }
backend.reviveOffers() backend.reviveOffers()
} }
...@@ -167,6 +169,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -167,6 +169,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
} }
} while (launchedTask) } while (launchedTask)
} }
if (tasks.size > 0) hasLaunchedTask = true
return tasks return tasks
} }
} }
...@@ -266,20 +269,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -266,20 +269,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
} }
} }
// Find and resource-starved TaskSets and alert the user
def detectStarvedTaskSets() {
val noOfferThresholdSeconds = 5
synchronized {
for (ts <- activeTaskSetsQueue) {
if (ts == TaskSetManager.firstTaskSet.get &&
(System.currentTimeMillis - ts.creationTime > noOfferThresholdSeconds * 1000) &&
ts.receivedOffers == 0) {
logWarning("No offers received. Check the scheduler UI to ensure slaves are registered.")
}
}
}
}
def executorLost(executorId: String, reason: ExecutorLossReason) { def executorLost(executorId: String, reason: ExecutorLossReason) {
var failedExecutor: Option[String] = None var failedExecutor: Option[String] = None
synchronized { synchronized {
......
...@@ -44,7 +44,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe ...@@ -44,7 +44,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksFinished = 0 var tasksFinished = 0
val creationTime = System.currentTimeMillis val creationTime = System.currentTimeMillis
var receivedOffers = 0
// Last time when we launched a preferred task (for delay scheduling) // Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis var lastPreferredLaunchTime = System.currentTimeMillis
...@@ -98,8 +97,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe ...@@ -98,8 +97,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
addPendingTask(i) addPendingTask(i)
} }
if (!TaskSetManager.firstTaskSet.isDefined) TaskSetManager.firstTaskSet = Some(this)
// Add a task to all the pending-task lists that it should be on. // Add a task to all the pending-task lists that it should be on.
private def addPendingTask(index: Int) { private def addPendingTask(index: Int) {
val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive
...@@ -192,7 +189,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe ...@@ -192,7 +189,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Respond to an offer of a single slave from the scheduler by finding a task // Respond to an offer of a single slave from the scheduler by finding a task
def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
receivedOffers += 1
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
...@@ -432,7 +428,3 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe ...@@ -432,7 +428,3 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return foundTasks return foundTasks
} }
} }
object TaskSetManager {
var firstTaskSet: Option[TaskSetManager] = None
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment