Skip to content
Snippets Groups Projects
Commit c306555f authored by zsxwing's avatar zsxwing Committed by Reynold Xin
Browse files

[SPARK-5219][Core] Add locks to avoid scheduling race conditions

Author: zsxwing <zsxwing@gmail.com>

Closes #4019 from zsxwing/SPARK-5219 and squashes the following commits:

36a8b4e [zsxwing] Add locks to avoid race conditions
parent 60f67e7a
No related branches found
No related tags found
No related merge requests found
...@@ -361,7 +361,7 @@ private[spark] class TaskSchedulerImpl( ...@@ -361,7 +361,7 @@ private[spark] class TaskSchedulerImpl(
dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId) dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
} }
def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) { def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {
taskSetManager.handleTaskGettingResult(tid) taskSetManager.handleTaskGettingResult(tid)
} }
......
...@@ -542,7 +542,7 @@ private[spark] class TaskSetManager( ...@@ -542,7 +542,7 @@ private[spark] class TaskSetManager(
/** /**
* Check whether has enough quota to fetch the result with `size` bytes * Check whether has enough quota to fetch the result with `size` bytes
*/ */
def canFetchMoreResults(size: Long): Boolean = synchronized { def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
totalResultSize += size totalResultSize += size
calculatedTasks += 1 calculatedTasks += 1
if (maxResultSize > 0 && totalResultSize > maxResultSize) { if (maxResultSize > 0 && totalResultSize > maxResultSize) {
...@@ -671,7 +671,7 @@ private[spark] class TaskSetManager( ...@@ -671,7 +671,7 @@ private[spark] class TaskSetManager(
maybeFinishTaskSet() maybeFinishTaskSet()
} }
def abort(message: String) { def abort(message: String): Unit = sched.synchronized {
// TODO: Kill running tasks if we were not terminated due to a Mesos error // TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.dagScheduler.taskSetFailed(taskSet, message) sched.dagScheduler.taskSetFailed(taskSet, message)
isZombie = true isZombie = true
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment