From f5d18af6a8a0b9f8c2e9677f9d8ae1712eb701c6 Mon Sep 17 00:00:00 2001 From: Rui Li <rui.li@intel.com> Date: Thu, 5 Jan 2017 14:51:13 -0800 Subject: [PATCH] [SPARK-14958][CORE] Failed task not handled when there's error deserializing failure reason ## What changes were proposed in this pull request? TaskResultGetter tries to deserialize the TaskEndReason before handling the failed task. If an error is thrown during deserialization, the failed task won't be handled, which leaves the job hanging. The PR proposes to handle the failed task in a finally block. ## How was this patch tested? In my case I hit a NoClassDefFoundError and the job hangs. Manually verified the patch can fix it. Author: Rui Li <rui.li@intel.com> Author: Rui Li <lirui@apache.org> Author: Rui Li <shlr@cn.ibm.com> Closes #12775 from lirui-intel/SPARK-14958. --- .../spark/scheduler/TaskResultGetter.scala | 6 +++++- .../scheduler/TaskResultGetterSuite.scala | 21 ++++++++++++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index b1addc128e..a284f7956c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -143,8 +143,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul logError( "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) case ex: Exception => // No-op + } finally { + // If there's an error while deserializing the TaskEndReason, this Runnable + // will die. Still tell the scheduler about the task failure, to avoid a hang + // where the scheduler thinks the task is still running. + scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) } - scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) } }) } catch { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index c9e682f53c..3e55d399e9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.File +import java.io.{File, ObjectInputStream} import java.net.URL import java.nio.ByteBuffer @@ -248,5 +248,24 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local assert(resSizeAfter.exists(_.toString.toLong > 0L)) } + test("failed task is handled when error occurs deserializing the reason") { + sc = new SparkContext("local", "test", conf) + val rdd = sc.parallelize(Seq(1), 1).map { _ => + throw new UndeserializableException + } + val message = intercept[SparkException] { + rdd.collect() + }.getMessage + // Job failed, even though the failure reason is unknown. + val unknownFailure = """(?s).*Lost task.*: UnknownReason.*""".r + assert(unknownFailure.findFirstMatchIn(message).isDefined) + } + +} + +private class UndeserializableException extends Exception { + private def readObject(in: ObjectInputStream): Unit = { + throw new NoClassDefFoundError() + } } -- GitLab