diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index b1addc128e69690dffba7cbde7c342c29182c61d..a284f7956cd317e5737c5367fa980b5edc18c9ff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -143,8 +143,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul logError( "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) case ex: Exception => // No-op + } finally { + // If there's an error while deserializing the TaskEndReason, this Runnable + // will die. Still tell the scheduler about the task failure, to avoid a hang + // where the scheduler thinks the task is still running. + scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) } - scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) } }) } catch { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index c9e682f53c105d77806de9dd92b4ca6969a7292f..3e55d399e9df947bf96b485ec48d9f4364fb9cb2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.File +import java.io.{File, ObjectInputStream} import java.net.URL import java.nio.ByteBuffer @@ -248,5 +248,24 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local assert(resSizeAfter.exists(_.toString.toLong > 0L)) } + test("failed task is handled when error occurs deserializing the reason") { + sc = new SparkContext("local", "test", conf) + val rdd = sc.parallelize(Seq(1), 1).map { _ => + throw new UndeserializableException + } + val message = intercept[SparkException] { + rdd.collect() + }.getMessage + // Job failed, even though the failure reason is unknown. + val unknownFailure = """(?s).*Lost task.*: UnknownReason.*""".r + assert(unknownFailure.findFirstMatchIn(message).isDefined) + } + +} + +private class UndeserializableException extends Exception { + private def readObject(in: ObjectInputStream): Unit = { + throw new NoClassDefFoundError() + } }