diff --git a/core/lib/mesos.jar b/core/lib/mesos.jar index b3474266927bac4d5611231a22113cec99943f44..941966c46a45bb77a3a9927777930c094d742ad3 100644 Binary files a/core/lib/mesos.jar and b/core/lib/mesos.jar differ diff --git a/core/src/main/scala/spark/Job.scala b/core/src/main/scala/spark/Job.scala index acff8ce5614ed67c48ee4d5a4885547cb472bd38..2200fb0c5d46aa800f9bb76df85b1e99fd0196d9 100644 --- a/core/src/main/scala/spark/Job.scala +++ b/core/src/main/scala/spark/Job.scala @@ -8,7 +8,7 @@ import org.apache.mesos.Protos._ * job by implementing various callbacks. */ abstract class Job(jobId: Int) { - def slaveOffer(s: SlaveOffer, availableCpus: Double): Option[TaskDescription] + def slaveOffer(s: Offer, availableCpus: Double): Option[TaskDescription] def statusUpdate(t: TaskStatus): Unit diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index 35cf6ae539adddc77327d8aca4870fcadd63c5eb..14c17123a3be93a3d052e1096366ee5bd47d6a7f 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -91,9 +91,9 @@ extends MScheduler with DAGScheduler with Logging setDaemon(true) override def run { val sched = MesosScheduler.this - sched.driver = new MesosSchedulerDriver(sched, master) + driver = new MesosSchedulerDriver(sched, frameworkName, getExecutorInfo, master) try { - val ret = sched.driver.run() + val ret = driver.run() logInfo("driver.run() returned with code " + ret) } catch { case e: Exception => @@ -103,9 +103,7 @@ extends MScheduler with DAGScheduler with Logging }.start } - override def getFrameworkName(d: SchedulerDriver): String = frameworkName - - override def getExecutorInfo(d: SchedulerDriver): ExecutorInfo = { + def getExecutorInfo(): ExecutorInfo = { val sparkHome = sc.getSparkHome match { case Some(path) => path case None => @@ -183,9 +181,9 @@ extends MScheduler with DAGScheduler with Logging * our active jobs for tasks in FIFO order. We fill each node with tasks in * a round-robin manner so that tasks are balanced across the cluster. */ - override def resourceOffer(d: SchedulerDriver, oid: OfferID, offers: JList[SlaveOffer]) { + override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { synchronized { - val tasks = new JArrayList[TaskDescription] + val tasks = offers.map(o => new JArrayList[TaskDescription]) val availableCpus = offers.map(o => getResource(o.getResourcesList(), "cpus")) val enoughMem = offers.map(o => { val mem = getResource(o.getResourcesList(), "mem") @@ -199,7 +197,7 @@ extends MScheduler with DAGScheduler with Logging for (i <- 0 until offers.size if enoughMem(i)) { job.slaveOffer(offers(i), availableCpus(i)) match { case Some(task) => - tasks.add(task) + tasks(i).add(task) val tid = task.getTaskId.getValue val sid = offers(i).getSlaveId.getValue taskIdToJobId(tid) = job.getId @@ -213,9 +211,10 @@ extends MScheduler with DAGScheduler with Logging } } while (launchedTask) } - val params = new JHashMap[String, String] - params.put("timeout", "1") - d.replyToOffer(oid, tasks, params) // TODO: use smaller timeout? + val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? + for (i <- 0 until offers.size if tasks(i).size > 0) { + d.launchTasks(offers(i).getId(), tasks(i), filters) + } } } diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index 9eee747cfdf5e422fc2c7f9fa57f644ab1b2b0e1..d982a75ba0fa210c9dc74946da0e9214efbe7de9 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -128,7 +128,7 @@ extends Job(jobId) with Logging } // Respond to an offer of a single slave from the scheduler by finding a task - def slaveOffer(offer: SlaveOffer, availableCpus: Double): Option[TaskDescription] = { + def slaveOffer(offer: Offer, availableCpus: Double): Option[TaskDescription] = { if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK) { val time = System.currentTimeMillis val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)