Skip to content
Snippets Groups Projects
Commit d401e1b3 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Fix a possible deadlock in MesosScheduler

parent 816d4e58
No related branches found
No related tags found
No related merge requests found
...@@ -54,7 +54,7 @@ private class MesosScheduler( ...@@ -54,7 +54,7 @@ private class MesosScheduler(
private val registeredLock = new Object() private val registeredLock = new Object()
private val activeJobs = new HashMap[Int, Job] private val activeJobs = new HashMap[Int, Job]
private var activeJobsQueue = new PriorityQueue[Job]()(jobOrdering) private var activeJobsQueue = new ArrayBuffer[Job]
private val taskIdToJobId = new HashMap[String, Int] private val taskIdToJobId = new HashMap[String, Int]
private val taskIdToSlaveId = new HashMap[String, String] private val taskIdToSlaveId = new HashMap[String, String]
...@@ -164,7 +164,7 @@ private class MesosScheduler( ...@@ -164,7 +164,7 @@ private class MesosScheduler(
def jobFinished(job: Job) { def jobFinished(job: Job) {
this.synchronized { this.synchronized {
activeJobs -= job.jobId activeJobs -= job.jobId
activeJobsQueue = activeJobsQueue.filterNot(_ == job) activeJobsQueue -= job
taskIdToJobId --= jobTasks(job.jobId) taskIdToJobId --= jobTasks(job.jobId)
taskIdToSlaveId --= jobTasks(job.jobId) taskIdToSlaveId --= jobTasks(job.jobId)
jobTasks.remove(job.jobId) jobTasks.remove(job.jobId)
...@@ -202,7 +202,7 @@ private class MesosScheduler( ...@@ -202,7 +202,7 @@ private class MesosScheduler(
mem >= EXECUTOR_MEMORY || slavesWithExecutors.contains(slaveId) mem >= EXECUTOR_MEMORY || slavesWithExecutors.contains(slaveId)
}) })
var launchedTask = false var launchedTask = false
for (job <- activeJobsQueue) { for (job <- activeJobsQueue.sorted(jobOrdering)) {
do { do {
launchedTask = false launchedTask = false
for (i <- 0 until offers.size if enoughMem(i)) { for (i <- 0 until offers.size if enoughMem(i)) {
...@@ -248,6 +248,7 @@ private class MesosScheduler( ...@@ -248,6 +248,7 @@ private class MesosScheduler(
} }
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
var jobToUpdate: Option[Job] = None
synchronized { synchronized {
try { try {
val tid = status.getTaskId.getValue val tid = status.getTaskId.getValue
...@@ -259,7 +260,7 @@ private class MesosScheduler( ...@@ -259,7 +260,7 @@ private class MesosScheduler(
taskIdToJobId.get(tid) match { taskIdToJobId.get(tid) match {
case Some(jobId) => case Some(jobId) =>
if (activeJobs.contains(jobId)) { if (activeJobs.contains(jobId)) {
activeJobs(jobId).statusUpdate(status) jobToUpdate = Some(activeJobs(jobId))
} }
if (isFinished(status.getState)) { if (isFinished(status.getState)) {
taskIdToJobId.remove(tid) taskIdToJobId.remove(tid)
...@@ -275,6 +276,9 @@ private class MesosScheduler( ...@@ -275,6 +276,9 @@ private class MesosScheduler(
case e: Exception => logError("Exception in statusUpdate", e) case e: Exception => logError("Exception in statusUpdate", e)
} }
} }
for (j <- jobToUpdate) {
j.statusUpdate(status)
}
} }
override def error(d: SchedulerDriver, code: Int, message: String) { override def error(d: SchedulerDriver, code: Int, message: String) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment