diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index bda9272b43393c564fef1c6dd6bd7290746d9148..8b95cda511643d9805693d5ec3146767e6cc27f0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -570,6 +570,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * // In a separate thread: * sc.cancelJobGroup("some_job_to_cancel"); * }}} + * + * If interruptOnCancel is set to true for the job group, then job cancellation will result + * in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure + * that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, + * where HDFS may respond to Thread.interrupt() by marking nodes as dead. + */ + def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean): Unit = + sc.setJobGroup(groupId, description, interruptOnCancel) + + /** + * Assigns a group ID to all the jobs started by this thread until the group ID is set to a + * different value or cleared. + * + * @see `setJobGroup(groupId: String, description: String, interruptThread: Boolean)`. + * This method sets interruptOnCancel to false. */ def setJobGroup(groupId: String, description: String): Unit = sc.setJobGroup(groupId, description) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 7a39d1af9e2d51821e8fbfb57e3c147d268aa463..16cfdf11c4a385cb943620f45fcd801734e70c03 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -21,7 +21,7 @@ import java.util.concurrent.Semaphore import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import scala.concurrent.future import org.scalatest.{BeforeAndAfter, FunSuite} @@ -101,18 +101,50 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count() } + // Block until both tasks of job A have started and cancel job A. + sem.acquire(2) + sc.clearJobGroup() val jobB = sc.parallelize(1 to 100, 2).countAsync() + sc.cancelJobGroup("jobA") + val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) } + assert(e.getMessage contains "cancel") + + // Once A is cancelled, job B should finish fairly quickly. + assert(jobB.get() === 100) + } + + + test("job group with interruption") { + sc = new SparkContext("local[2]", "test") + + // Add a listener to release the semaphore once any tasks are launched. + val sem = new Semaphore(0) + sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart) { + sem.release() + } + }) + + // jobA is the one to be cancelled. + val jobA = future { + sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true) + sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count() + } // Block until both tasks of job A have started and cancel job A. sem.acquire(2) + + sc.clearJobGroup() + val jobB = sc.parallelize(1 to 100, 2).countAsync() sc.cancelJobGroup("jobA") - val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) } + val e = intercept[SparkException] { Await.result(jobA, 5.seconds) } assert(e.getMessage contains "cancel") // Once A is cancelled, job B should finish fairly quickly. assert(jobB.get() === 100) } + /* test("two jobs sharing the same stage") { // sem1: make sure cancel is issued after some tasks are launched