diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 83469c5ff0600ede6e59b943f7022495abb80492..18f04391d64c3f3f00df9d39aba944a364d8c5f7 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -432,7 +432,8 @@ private[spark] class Executor(
           setTaskFinishedAndClearInterruptStatus()
           execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason)))
 
-        case NonFatal(_) if task != null && task.reasonIfKilled.isDefined =>
+        case _: InterruptedException | NonFatal(_) if
+            task != null && task.reasonIfKilled.isDefined =>
           val killReason = task.reasonIfKilled.getOrElse("unknown reason")
           logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")
           setTaskFinishedAndClearInterruptStatus()
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 735f4454e299edcfd2185cc3573fdb7f05a1e31c..7e26139a2beadb93deb8c6a5643f3f6ece1bfde9 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -540,10 +540,24 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
     }
   }
 
-  // Launches one task that will run forever. Once the SparkListener detects the task has
+  testCancellingTasks("that raise interrupted exception on cancel") {
+    Thread.sleep(9999999)
+  }
+
+  // SPARK-20217 should not fail stage if task throws non-interrupted exception
+  testCancellingTasks("that raise runtime exception on cancel") {
+    try {
+      Thread.sleep(9999999)
+    } catch {
+      case t: Throwable =>
+        throw new RuntimeException("killed")
+    }
+  }
+
+  // Launches one task that will block forever. Once the SparkListener detects the task has
   // started, kill and re-schedule it. The second run of the task will complete immediately.
   // If this test times out, then the first version of the task wasn't killed successfully.
-  test("Killing tasks") {
+  def testCancellingTasks(desc: String)(blockFn: => Unit): Unit = test(s"Killing tasks $desc") {
     sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
 
     SparkContextSuite.isTaskStarted = false
@@ -572,13 +586,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
         // first attempt will hang
         if (!SparkContextSuite.isTaskStarted) {
           SparkContextSuite.isTaskStarted = true
-          try {
-            Thread.sleep(9999999)
-          } catch {
-            case t: Throwable =>
-              // SPARK-20217 should not fail stage if task throws non-interrupted exception
-              throw new RuntimeException("killed")
-          }
+          blockFn
         }
         // second attempt succeeds immediately
       }