diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..35d88e4b7249f5baa856ea75f62b2765ee09ddb9
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.Semaphore
+
+import scala.concurrent.future
+import scala.concurrent.ExecutionContext.Implicits.global
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.matchers.ShouldMatchers
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListener}
+
+
+/**
+ * Test suite for cancelling running jobs. We run the cancellation tasks for single job action
+ * (e.g. count) as well as multi-job action (e.g. take). We test in the combination of:
+ * - FIFO vs fair scheduler
+ * - local vs local cluster
+ */
+class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
+  with LocalSparkContext {
+
+  override def afterEach() {
+    System.clearProperty("spark.scheduler.mode")
+  }
+
+  test("local mode, FIFO scheduler") {
+    System.setProperty("spark.scheduler.mode", "FIFO")
+    sc = new SparkContext("local[2]", "test")
+    testCount()
+    testTake()
+    resetSparkContext()
+  }
+
+  test("cluster mode, FIFO scheduler") {
+    System.setProperty("spark.scheduler.mode", "FIFO")
+    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    testCount()
+    testTake()
+    resetSparkContext()
+  }
+
+  test("local mode, fair scheduler") {
+    System.setProperty("spark.scheduler.mode", "FAIR")
+    val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
+    System.setProperty("spark.scheduler.allocation.file", xmlPath)
+    sc = new SparkContext("local[2]", "test")
+    testCount()
+    testTake()
+    resetSparkContext()
+  }
+
+  test("cluster mode, fair scheduler") {
+    System.setProperty("spark.scheduler.mode", "FAIR")
+    val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
+    System.setProperty("spark.scheduler.allocation.file", xmlPath)
+    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    testCount()
+    testTake()
+    resetSparkContext()
+  }
+
+  def testCount() {
+    // Cancel before launching any tasks
+    {
+      val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
+      future { f.cancel() }
+      val e = intercept[SparkException] { f.get() }
+      assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
+    }
+
+    // Cancel after some tasks have been launched
+    {
+      // Add a listener to release the semaphore once any tasks are launched.
+      val sem = new Semaphore(0)
+      sc.dagScheduler.addSparkListener(new SparkListener {
+        override def onTaskStart(taskStart: SparkListenerTaskStart) {
+          sem.release()
+        }
+      })
+
+      val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
+      future {
+        // Wait until some tasks were launched before we cancel the job.
+        sem.acquire()
+        f.cancel()
+      }
+      val e = intercept[SparkException] { f.get() }
+      assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
+    }
+  }
+
+  def testTake() {
+    // Cancel before launching any tasks
+    {
+      val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
+      future { f.cancel() }
+      val e = intercept[SparkException] { f.get() }
+      assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
+    }
+
+    // Cancel after some tasks have been launched
+    {
+      // Add a listener to release the semaphore once any tasks are launched.
+      val sem = new Semaphore(0)
+      sc.dagScheduler.addSparkListener(new SparkListener {
+        override def onTaskStart(taskStart: SparkListenerTaskStart) {
+          sem.release()
+        }
+      })
+      val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
+      future {
+        sem.acquire()
+        f.cancel()
+      }
+      val e = intercept[SparkException] { f.get() }
+      assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
+    }
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index ac84640751d74b34eb0a56b3e5b9a7739d0d21a4..131e2466ac3805784984719e9248014da2a9af7b 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -45,56 +45,6 @@ class AsyncRDDActionsSuite extends FunSuite with BeforeAndAfterAll {
 
   lazy val zeroPartRdd = new EmptyRDD[Int](sc)
 
-  test("job cancellation before any tasks is launched") {
-    val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
-    future { f.cancel() }
-    val e = intercept[SparkException] { f.get() }
-    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
-  }
-
-  test("job cancellation after some tasks have been launched") {
-    // Add a listener to release the semaphore once any tasks are launched.
-    val sem = new Semaphore(0)
-    sc.dagScheduler.addSparkListener(new SparkListener {
-      override def onTaskStart(taskStart: SparkListenerTaskStart) {
-        sem.release()
-      }
-    })
-
-    val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
-    future {
-      // Wait until some tasks were launched before we cancel the job.
-      sem.acquire()
-      f.cancel()
-    }
-    val e = intercept[SparkException] { f.get() }
-    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
-  }
-
-  test("cancelling take action") {
-    val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
-    future { f.cancel() }
-    val e = intercept[SparkException] { f.get() }
-    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
-  }
-
-  test("cancelling take action after some tasks have been launched") {
-    // Add a listener to release the semaphore once any tasks are launched.
-    val sem = new Semaphore(0)
-    sc.dagScheduler.addSparkListener(new SparkListener {
-      override def onTaskStart(taskStart: SparkListenerTaskStart) {
-        sem.release()
-      }
-    })
-    val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
-    future {
-      sem.acquire()
-      f.cancel()
-    }
-    val e = intercept[SparkException] { f.get() }
-    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
-  }
-
   test("countAsync") {
     assert(zeroPartRdd.countAsync().get() === 0)
     assert(sc.parallelize(1 to 10000, 5).countAsync().get() === 10000)