Skip to content
Snippets Groups Projects
Commit ab0940f0 authored by Reynold Xin's avatar Reynold Xin
Browse files

Job cancellation: addressed code review feedback round 2 from Kay.

parent 97ffebbe
No related branches found
No related tags found
No related merge requests found
......@@ -56,7 +56,7 @@ trait FutureAction[T] extends Future[T] {
override def result(atMost: Duration)(implicit permit: CanAwait): T
/**
* When this action is completed, either through an exception, or a value, apply the provided
* When this action is completed, either through an exception, or a value, applies the provided
* function.
*/
def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext)
......@@ -91,7 +91,7 @@ class FutureJob[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {
override def cancel() {
jobWaiter.kill()
jobWaiter.cancel()
}
override def ready(atMost: Duration)(implicit permit: CanAwait): FutureJob.this.type = {
......
......@@ -39,7 +39,12 @@ private[spark] class JobWaiter[T](
// partition RDDs), we set the jobResult directly to JobSucceeded.
private var jobResult: JobResult = if (jobFinished) JobSucceeded else null
def kill() {
/**
* Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled
* asynchronously. After the low level scheduler cancels all the tasks belonging to this job, it
* will fail this job with a SparkException.
*/
def cancel() {
dagScheduler.cancelJob(jobId)
}
......
......@@ -461,54 +461,52 @@ private[spark] class ClusterTaskSetManager(
// Check if the problem is a map output fetch failure. In that case, this
// task will never succeed on any node, so tell the scheduler about it.
reason.foreach {
_ match {
case fetchFailed: FetchFailed =>
logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
successful(index) = true
tasksSuccessful += 1
sched.taskSetFinished(this)
removeAllRunningTasks()
return
case TaskKilled =>
logInfo("Task %d was killed.".format(tid))
abort("Task %d was killed".format(tid))
return
case ef: ExceptionFailure =>
sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
val key = ef.description
val now = clock.getTime()
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
val (dupCount, printTime) = recentExceptions(key)
if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
recentExceptions(key) = (0, now)
(true, 0)
} else {
recentExceptions(key) = (dupCount + 1, printTime)
(false, dupCount + 1)
}
} else {
case fetchFailed: FetchFailed =>
logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
successful(index) = true
tasksSuccessful += 1
sched.taskSetFinished(this)
removeAllRunningTasks()
return
case TaskKilled =>
logInfo("Task %d was killed.".format(tid))
sched.listener.taskEnded(tasks(index), reason.get, null, null, info, null)
return
case ef: ExceptionFailure =>
sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
val key = ef.description
val now = clock.getTime()
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
val (dupCount, printTime) = recentExceptions(key)
if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
recentExceptions(key) = (0, now)
(true, 0)
} else {
recentExceptions(key) = (dupCount + 1, printTime)
(false, dupCount + 1)
}
}
if (printFull) {
val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
logInfo("Loss was due to %s\n%s\n%s".format(
ef.className, ef.description, locs.mkString("\n")))
} else {
logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
recentExceptions(key) = (0, now)
(true, 0)
}
}
if (printFull) {
val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
logInfo("Loss was due to %s\n%s\n%s".format(
ef.className, ef.description, locs.mkString("\n")))
} else {
logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
}
case TaskResultLost =>
logInfo("Lost result for TID %s on host %s".format(tid, info.host))
sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
case TaskResultLost =>
logInfo("Lost result for TID %s on host %s".format(tid, info.host))
sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
case _ => {}
}
case _ => {}
}
// On non-fetch failures, re-enqueue the task as pending for a max number of retries
addPendingTask(index)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment