diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ff411e24a3d856b97d7a12da5f01ce3d616d0397..81a87438a84f6f07694f5f829f30dc6847d763fd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1031,7 +1031,7 @@ class DAGScheduler(
   private def failJobAndIndependentStages(job: ActiveJob, failureReason: String,
       resultStage: Option[Stage]) {
     val error = new SparkException(failureReason)
-    job.listener.jobFailed(error)
+    var ableToCancelStages = true
 
     val shouldInterruptThread =
       if (job.properties == null) false
@@ -1055,18 +1055,26 @@ class DAGScheduler(
           // This is the only job that uses this stage, so fail the stage if it is running.
           val stage = stageIdToStage(stageId)
           if (runningStages.contains(stage)) {
-            taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-            val stageInfo = stageToInfos(stage)
-            stageInfo.stageFailed(failureReason)
-            listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+            try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
+              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+              val stageInfo = stageToInfos(stage)
+              stageInfo.stageFailed(failureReason)
+              listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+            } catch {
+              case e: UnsupportedOperationException =>
+                logInfo(s"Could not cancel tasks for stage $stageId", e)
+              ableToCancelStages = false
+            }
           }
         }
       }
     }
 
-    cleanupStateForJobAndIndependentStages(job, resultStage)
-
-    listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+    if (ableToCancelStages) {
+      job.listener.jobFailed(error)
+      cleanupStateForJobAndIndependentStages(job, resultStage)
+      listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
+    }
   }
 
   /**
@@ -1148,7 +1156,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
       case x: Exception =>
         logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
           .format(x.getMessage))
-        dagScheduler.doCancelAllJobs()
+        try {
+          dagScheduler.doCancelAllJobs()
+        } catch {
+          case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
+        }
         dagScheduler.sc.stop()
         Stop
     }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 7e901f8e915884cdbe51ed69ba4c4806c757299e..23a350be561f576695166e15b90a378cbe28613e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -114,6 +114,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     sc = new SparkContext("local", "DAGSchedulerSuite")
     sparkListener.successfulStages.clear()
     sparkListener.failedStages.clear()
+    failure = null
     sc.addSparkListener(sparkListener)
     taskSets.clear()
     cancelledStages.clear()
@@ -299,6 +300,53 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
+  test("job cancellation no-kill backend") {
+    // make sure that the DAGScheduler doesn't crash when the TaskScheduler
+    // doesn't implement killTask()
+    val noKillTaskScheduler = new TaskScheduler() {
+      override def rootPool: Pool = null
+      override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+      override def start() = {}
+      override def stop() = {}
+      override def submitTasks(taskSet: TaskSet) = {
+        taskSets += taskSet
+      }
+      override def cancelTasks(stageId: Int, interruptThread: Boolean) {
+        throw new UnsupportedOperationException
+      }
+      override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
+      override def defaultParallelism() = 2
+    }
+    val noKillScheduler = new DAGScheduler(
+      sc,
+      noKillTaskScheduler,
+      sc.listenerBus,
+      mapOutputTracker,
+      blockManagerMaster,
+      sc.env) {
+      override def runLocally(job: ActiveJob) {
+        // don't bother with the thread while unit testing
+        runLocallyWithinThread(job)
+      }
+    }
+    dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor](
+      Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system)
+    val rdd = makeRdd(1, Nil)
+    val jobId = submit(rdd, Array(0))
+    cancel(jobId)
+    // Because the job wasn't actually cancelled, we shouldn't have received a failure message.
+    assert(failure === null)
+
+    // When the task set completes normally, state should be correctly updated.
+    complete(taskSets(0), Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+    assertDataStructuresEmpty
+
+    assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+    assert(sparkListener.failedStages.isEmpty)
+    assert(sparkListener.successfulStages.contains(0))
+  }
+
   test("run trivial shuffle") {
     val shuffleMapRdd = makeRdd(2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)