Skip to content
Snippets Groups Projects
Commit 92b01250 authored by Mark Hamstra's avatar Mark Hamstra Committed by Patrick Wendell
Browse files

[SPARK-1749] Job cancellation when SchedulerBackend does not implement killTask


This is a fixed up version of #686 (cc @markhamstra @pwendell).  The last commit (the only one I authored) reflects the changes I made from Mark's original patch.

Author: Mark Hamstra <markhamstra@gmail.com>
Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #1219 from kayousterhout/mark-SPARK-1749 and squashes the following commits:

42dfa7e [Kay Ousterhout] Got rid of terrible double-negative name
80b3205 [Kay Ousterhout] Don't notify listeners of job failure if it wasn't successfully cancelled.
d156d33 [Mark Hamstra] Do nothing in no-kill submitTasks
9312baa [Mark Hamstra] code review update
cc353c8 [Mark Hamstra] scalastyle
e61f7f8 [Mark Hamstra] Catch UnsupportedOperationException when DAGScheduler tries to cancel a job on a SchedulerBackend that does not implement killTask
(cherry picked from commit b88a59a6)

Signed-off-by: default avatarPatrick Wendell <pwendell@gmail.com>
parent 5869f8bf
No related branches found
No related tags found
No related merge requests found
...@@ -1031,7 +1031,7 @@ class DAGScheduler( ...@@ -1031,7 +1031,7 @@ class DAGScheduler(
private def failJobAndIndependentStages(job: ActiveJob, failureReason: String, private def failJobAndIndependentStages(job: ActiveJob, failureReason: String,
resultStage: Option[Stage]) { resultStage: Option[Stage]) {
val error = new SparkException(failureReason) val error = new SparkException(failureReason)
job.listener.jobFailed(error) var ableToCancelStages = true
val shouldInterruptThread = val shouldInterruptThread =
if (job.properties == null) false if (job.properties == null) false
...@@ -1055,18 +1055,26 @@ class DAGScheduler( ...@@ -1055,18 +1055,26 @@ class DAGScheduler(
// This is the only job that uses this stage, so fail the stage if it is running. // This is the only job that uses this stage, so fail the stage if it is running.
val stage = stageIdToStage(stageId) val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) { if (runningStages.contains(stage)) {
taskScheduler.cancelTasks(stageId, shouldInterruptThread) try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
val stageInfo = stageToInfos(stage) taskScheduler.cancelTasks(stageId, shouldInterruptThread)
stageInfo.stageFailed(failureReason) val stageInfo = stageToInfos(stage)
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) stageInfo.stageFailed(failureReason)
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
} catch {
case e: UnsupportedOperationException =>
logInfo(s"Could not cancel tasks for stage $stageId", e)
ableToCancelStages = false
}
} }
} }
} }
} }
cleanupStateForJobAndIndependentStages(job, resultStage) if (ableToCancelStages) {
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) cleanupStateForJobAndIndependentStages(job, resultStage)
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
}
} }
/** /**
...@@ -1148,7 +1156,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) ...@@ -1148,7 +1156,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
case x: Exception => case x: Exception =>
logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
.format(x.getMessage)) .format(x.getMessage))
dagScheduler.doCancelAllJobs() try {
dagScheduler.doCancelAllJobs()
} catch {
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
}
dagScheduler.sc.stop() dagScheduler.sc.stop()
Stop Stop
} }
......
...@@ -114,6 +114,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F ...@@ -114,6 +114,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
sc = new SparkContext("local", "DAGSchedulerSuite") sc = new SparkContext("local", "DAGSchedulerSuite")
sparkListener.successfulStages.clear() sparkListener.successfulStages.clear()
sparkListener.failedStages.clear() sparkListener.failedStages.clear()
failure = null
sc.addSparkListener(sparkListener) sc.addSparkListener(sparkListener)
taskSets.clear() taskSets.clear()
cancelledStages.clear() cancelledStages.clear()
...@@ -299,6 +300,53 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F ...@@ -299,6 +300,53 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty assertDataStructuresEmpty
} }
test("job cancellation no-kill backend") {
// make sure that the DAGScheduler doesn't crash when the TaskScheduler
// doesn't implement killTask()
val noKillTaskScheduler = new TaskScheduler() {
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start() = {}
override def stop() = {}
override def submitTasks(taskSet: TaskSet) = {
taskSets += taskSet
}
override def cancelTasks(stageId: Int, interruptThread: Boolean) {
throw new UnsupportedOperationException
}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
}
val noKillScheduler = new DAGScheduler(
sc,
noKillTaskScheduler,
sc.listenerBus,
mapOutputTracker,
blockManagerMaster,
sc.env) {
override def runLocally(job: ActiveJob) {
// don't bother with the thread while unit testing
runLocallyWithinThread(job)
}
}
dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system)
val rdd = makeRdd(1, Nil)
val jobId = submit(rdd, Array(0))
cancel(jobId)
// Because the job wasn't actually cancelled, we shouldn't have received a failure message.
assert(failure === null)
// When the task set completes normally, state should be correctly updated.
complete(taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.isEmpty)
assert(sparkListener.successfulStages.contains(0))
}
test("run trivial shuffle") { test("run trivial shuffle") {
val shuffleMapRdd = makeRdd(2, Nil) val shuffleMapRdd = makeRdd(2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment