Skip to content
Snippets Groups Projects
Commit 0497ea51 authored by Sean Owen's avatar Sean Owen Committed by Josh Rosen
Browse files

SPARK-960 [CORE] [TEST] JobCancellationSuite "two jobs sharing the same stage" is broken

This reenables and fixes this test, after addressing two issues:

- The Semaphore that was intended to be shared locally was being serialized and copied; it's now a static member in the companion object as in other tests
- Later changes to Spark means that cancelling the first task will not cancel the shared stage and therefore the second task should succeed

Author: Sean Owen <sowen@cloudera.com>

Closes #4180 from srowen/SPARK-960 and squashes the following commits:

43da66f [Sean Owen] Fix 'two jobs sharing the same stage' test and reenable it: truly share a Semaphore locally as intended, and update expectation of failure in non-cancelled task
parent b38034e8
No related branches found
No related tags found
No related merge requests found
......@@ -171,11 +171,11 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
assert(jobB.get() === 100)
}
ignore("two jobs sharing the same stage") {
test("two jobs sharing the same stage") {
// sem1: make sure cancel is issued after some tasks are launched
// sem2: make sure the first stage is not finished until cancel is issued
// twoJobsSharingStageSemaphore:
// make sure the first stage is not finished until cancel is issued
val sem1 = new Semaphore(0)
val sem2 = new Semaphore(0)
sc = new SparkContext("local[2]", "test")
sc.addSparkListener(new SparkListener {
......@@ -186,7 +186,7 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
// Create two actions that would share the some stages.
val rdd = sc.parallelize(1 to 10, 2).map { i =>
sem2.acquire()
JobCancellationSuite.twoJobsSharingStageSemaphore.acquire()
(i, i)
}.reduceByKey(_+_)
val f1 = rdd.collectAsync()
......@@ -196,13 +196,13 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
future {
sem1.acquire()
f1.cancel()
sem2.release(10)
JobCancellationSuite.twoJobsSharingStageSemaphore.release(10)
}
// Expect both to fail now.
// TODO: update this test when we change Spark so cancelling f1 wouldn't affect f2.
// Expect f1 to fail due to cancellation,
intercept[SparkException] { f1.get() }
intercept[SparkException] { f2.get() }
// but f2 should not be affected
f2.get()
}
def testCount() {
......@@ -268,4 +268,5 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
object JobCancellationSuite {
val taskStartedSemaphore = new Semaphore(0)
val taskCancelledSemaphore = new Semaphore(0)
val twoJobsSharingStageSemaphore = new Semaphore(0)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment