diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 9185b9529c3bf455d4904c56a7a19bc9946a33fc..85018cb046de723a5219474668b980fced253963 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -56,7 +56,7 @@ trait FutureAction[T] extends Future[T] {
   override def result(atMost: Duration)(implicit permit: CanAwait): T
 
   /**
-   * When this action is completed, either through an exception, or a value, apply the provided
+   * When this action is completed, either through an exception, or a value, applies the provided
    * function.
    */
   def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext)
@@ -91,7 +91,7 @@ class FutureJob[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
   extends FutureAction[T] {
 
   override def cancel() {
-    jobWaiter.kill()
+    jobWaiter.cancel()
   }
 
   override def ready(atMost: Duration)(implicit permit: CanAwait): FutureJob.this.type = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index 727454725d49ffc0793693ca4dd5684952d292be..58f238d8cfc5d2a1f2914afbcec1d4474b21aab4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -39,7 +39,12 @@ private[spark] class JobWaiter[T](
   // partition RDDs), we set the jobResult directly to JobSucceeded.
   private var jobResult: JobResult = if (jobFinished) JobSucceeded else null
 
-  def kill() {
+  /**
+   * Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled
+   * asynchronously. After the low level scheduler cancels all the tasks belonging to this job, it
+   * will fail this job with a SparkException.
+   */
+  def cancel() {
     dagScheduler.cancelJob(jobId)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 1198bac6dd45180715fa733c215ee05d0de35c9f..7e0667ba03aa408a0cab3d964e1086a706a81656 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -461,54 +461,52 @@ private[spark] class ClusterTaskSetManager(
       // Check if the problem is a map output fetch failure. In that case, this
       // task will never succeed on any node, so tell the scheduler about it.
       reason.foreach {
-        _ match {
-          case fetchFailed: FetchFailed =>
-            logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
-            sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
-            successful(index) = true
-            tasksSuccessful += 1
-            sched.taskSetFinished(this)
-            removeAllRunningTasks()
-            return
-
-          case TaskKilled =>
-            logInfo("Task %d was killed.".format(tid))
-            abort("Task %d was killed".format(tid))
-            return
-
-          case ef: ExceptionFailure =>
-            sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
-            val key = ef.description
-            val now = clock.getTime()
-            val (printFull, dupCount) = {
-              if (recentExceptions.contains(key)) {
-                val (dupCount, printTime) = recentExceptions(key)
-                if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
-                  recentExceptions(key) = (0, now)
-                  (true, 0)
-                } else {
-                  recentExceptions(key) = (dupCount + 1, printTime)
-                  (false, dupCount + 1)
-                }
-              } else {
+        case fetchFailed: FetchFailed =>
+          logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
+          sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
+          successful(index) = true
+          tasksSuccessful += 1
+          sched.taskSetFinished(this)
+          removeAllRunningTasks()
+          return
+
+        case TaskKilled =>
+          logInfo("Task %d was killed.".format(tid))
+          sched.listener.taskEnded(tasks(index), reason.get, null, null, info, null)
+          return
+
+        case ef: ExceptionFailure =>
+          sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
+          val key = ef.description
+          val now = clock.getTime()
+          val (printFull, dupCount) = {
+            if (recentExceptions.contains(key)) {
+              val (dupCount, printTime) = recentExceptions(key)
+              if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
                 recentExceptions(key) = (0, now)
                 (true, 0)
+              } else {
+                recentExceptions(key) = (dupCount + 1, printTime)
+                (false, dupCount + 1)
               }
-            }
-            if (printFull) {
-              val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
-              logInfo("Loss was due to %s\n%s\n%s".format(
-                ef.className, ef.description, locs.mkString("\n")))
             } else {
-              logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
+              recentExceptions(key) = (0, now)
+              (true, 0)
             }
+          }
+          if (printFull) {
+            val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
+            logInfo("Loss was due to %s\n%s\n%s".format(
+              ef.className, ef.description, locs.mkString("\n")))
+          } else {
+            logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
+          }
 
-          case TaskResultLost =>
-            logInfo("Lost result for TID %s on host %s".format(tid, info.host))
-            sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
+        case TaskResultLost =>
+          logInfo("Lost result for TID %s on host %s".format(tid, info.host))
+          sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
 
-          case _ => {}
-        }
+        case _ => {}
       }
       // On non-fetch failures, re-enqueue the task as pending for a max number of retries
       addPendingTask(index)