diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index fc0ee070897ddac1cfd601deace897b5d401cfc1..5ad00a1ed1e1070ec1f57237739758450c35bfa0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -629,7 +629,7 @@ private[spark] class TaskSetManager( } // Also re-enqueue any tasks that were running on the node for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { - handleFailedTask(tid, TaskState.KILLED, None) + handleFailedTask(tid, TaskState.FAILED, None) } } diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index d9cb7fead5b883231c8e93bb19906e1ddd17ef4c..27c4b017997bc6213897454c74d8f17ec3cf0264 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -125,6 +125,21 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(thrown.getMessage.contains("failed 4 times")) } + test("repeatedly failing task that crashes JVM") { + // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather + // than hanging. + sc = new SparkContext(clusterUrl, "test") + failAfter(Span(100000, Millis)) { + val thrown = intercept[SparkException] { + // One of the tasks always fails. + sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) } + } + assert(thrown.getClass === classOf[SparkException]) + System.out.println(thrown.getMessage) + assert(thrown.getMessage.contains("failed 4 times")) + } + } + test("caching") { sc = new SparkContext(clusterUrl, "test") val data = sc.parallelize(1 to 1000, 10).cache()