diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index ee14d091ce931647d38f3922681eda17231bfdba..b95f40b877c8fc07d1ea7d6debab9bfa88d0e37a 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -54,7 +54,7 @@ private class MesosScheduler( private val registeredLock = new Object() private val activeJobs = new HashMap[Int, Job] - private var activeJobsQueue = new PriorityQueue[Job]()(jobOrdering) + private var activeJobsQueue = new ArrayBuffer[Job] private val taskIdToJobId = new HashMap[String, Int] private val taskIdToSlaveId = new HashMap[String, String] @@ -164,7 +164,7 @@ private class MesosScheduler( def jobFinished(job: Job) { this.synchronized { activeJobs -= job.jobId - activeJobsQueue = activeJobsQueue.filterNot(_ == job) + activeJobsQueue -= job taskIdToJobId --= jobTasks(job.jobId) taskIdToSlaveId --= jobTasks(job.jobId) jobTasks.remove(job.jobId) @@ -202,7 +202,7 @@ private class MesosScheduler( mem >= EXECUTOR_MEMORY || slavesWithExecutors.contains(slaveId) }) var launchedTask = false - for (job <- activeJobsQueue) { + for (job <- activeJobsQueue.sorted(jobOrdering)) { do { launchedTask = false for (i <- 0 until offers.size if enoughMem(i)) { @@ -248,6 +248,7 @@ private class MesosScheduler( } override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { + var jobToUpdate: Option[Job] = None synchronized { try { val tid = status.getTaskId.getValue @@ -259,7 +260,7 @@ private class MesosScheduler( taskIdToJobId.get(tid) match { case Some(jobId) => if (activeJobs.contains(jobId)) { - activeJobs(jobId).statusUpdate(status) + jobToUpdate = Some(activeJobs(jobId)) } if (isFinished(status.getState)) { taskIdToJobId.remove(tid) @@ -275,6 +276,9 @@ private class MesosScheduler( case e: Exception => logError("Exception in statusUpdate", e) } } + for (j <- jobToUpdate) { + j.statusUpdate(status) + } } override def error(d: SchedulerDriver, code: Int, message: String) {