Skip to content
Snippets Groups Projects
Commit 157279e9 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Update Spark to work with the latest Mesos API

parent 3a0e6c43
No related branches found
No related tags found
No related merge requests found
No preview for this file type
...@@ -8,7 +8,7 @@ import org.apache.mesos.Protos._ ...@@ -8,7 +8,7 @@ import org.apache.mesos.Protos._
* job by implementing various callbacks. * job by implementing various callbacks.
*/ */
abstract class Job(jobId: Int) { abstract class Job(jobId: Int) {
def slaveOffer(s: SlaveOffer, availableCpus: Double): Option[TaskDescription] def slaveOffer(s: Offer, availableCpus: Double): Option[TaskDescription]
def statusUpdate(t: TaskStatus): Unit def statusUpdate(t: TaskStatus): Unit
......
...@@ -91,9 +91,9 @@ extends MScheduler with DAGScheduler with Logging ...@@ -91,9 +91,9 @@ extends MScheduler with DAGScheduler with Logging
setDaemon(true) setDaemon(true)
override def run { override def run {
val sched = MesosScheduler.this val sched = MesosScheduler.this
sched.driver = new MesosSchedulerDriver(sched, master) driver = new MesosSchedulerDriver(sched, frameworkName, getExecutorInfo, master)
try { try {
val ret = sched.driver.run() val ret = driver.run()
logInfo("driver.run() returned with code " + ret) logInfo("driver.run() returned with code " + ret)
} catch { } catch {
case e: Exception => case e: Exception =>
...@@ -103,9 +103,7 @@ extends MScheduler with DAGScheduler with Logging ...@@ -103,9 +103,7 @@ extends MScheduler with DAGScheduler with Logging
}.start }.start
} }
override def getFrameworkName(d: SchedulerDriver): String = frameworkName def getExecutorInfo(): ExecutorInfo = {
override def getExecutorInfo(d: SchedulerDriver): ExecutorInfo = {
val sparkHome = sc.getSparkHome match { val sparkHome = sc.getSparkHome match {
case Some(path) => path case Some(path) => path
case None => case None =>
...@@ -183,9 +181,9 @@ extends MScheduler with DAGScheduler with Logging ...@@ -183,9 +181,9 @@ extends MScheduler with DAGScheduler with Logging
* our active jobs for tasks in FIFO order. We fill each node with tasks in * our active jobs for tasks in FIFO order. We fill each node with tasks in
* a round-robin manner so that tasks are balanced across the cluster. * a round-robin manner so that tasks are balanced across the cluster.
*/ */
override def resourceOffer(d: SchedulerDriver, oid: OfferID, offers: JList[SlaveOffer]) { override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized { synchronized {
val tasks = new JArrayList[TaskDescription] val tasks = offers.map(o => new JArrayList[TaskDescription])
val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus")) val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus"))
val enoughMem = offers.map(o => { val enoughMem = offers.map(o => {
val mem = getResource(o.getResourcesList(), "mem") val mem = getResource(o.getResourcesList(), "mem")
...@@ -199,7 +197,7 @@ extends MScheduler with DAGScheduler with Logging ...@@ -199,7 +197,7 @@ extends MScheduler with DAGScheduler with Logging
for (i <- 0 until offers.size if enoughMem(i)) { for (i <- 0 until offers.size if enoughMem(i)) {
job.slaveOffer(offers(i), availableCpus(i)) match { job.slaveOffer(offers(i), availableCpus(i)) match {
case Some(task) => case Some(task) =>
tasks.add(task) tasks(i).add(task)
val tid = task.getTaskId.getValue val tid = task.getTaskId.getValue
val sid = offers(i).getSlaveId.getValue val sid = offers(i).getSlaveId.getValue
taskIdToJobId(tid) = job.getId taskIdToJobId(tid) = job.getId
...@@ -213,9 +211,10 @@ extends MScheduler with DAGScheduler with Logging ...@@ -213,9 +211,10 @@ extends MScheduler with DAGScheduler with Logging
} }
} while (launchedTask) } while (launchedTask)
} }
val params = new JHashMap[String, String] val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
params.put("timeout", "1") for (i <- 0 until offers.size if tasks(i).size > 0) {
d.replyToOffer(oid, tasks, params) // TODO: use smaller timeout? d.launchTasks(offers(i).getId(), tasks(i), filters)
}
} }
} }
......
...@@ -128,7 +128,7 @@ extends Job(jobId) with Logging ...@@ -128,7 +128,7 @@ extends Job(jobId) with Logging
} }
// Respond to an offer of a single slave from the scheduler by finding a task // Respond to an offer of a single slave from the scheduler by finding a task
def slaveOffer(offer: SlaveOffer, availableCpus: Double): Option[TaskDescription] = { def slaveOffer(offer: Offer, availableCpus: Double): Option[TaskDescription] = {
if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK) { if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment