Skip to content
Snippets Groups Projects
Commit f5d18af6 authored by Rui Li's avatar Rui Li Committed by Kay Ousterhout
Browse files

[SPARK-14958][CORE] Failed task not handled when there's error deserializing failure reason

## What changes were proposed in this pull request?

TaskResultGetter tries to deserialize the TaskEndReason before handling the failed task. If an error is thrown during deserialization, the failed task won't be handled, which leaves the job hanging.
The PR proposes to handle the failed task in a finally block.
## How was this patch tested?

In my case I hit a NoClassDefFoundError and the job hangs. Manually verified the patch can fix it.

Author: Rui Li <rui.li@intel.com>
Author: Rui Li <lirui@apache.org>
Author: Rui Li <shlr@cn.ibm.com>

Closes #12775 from lirui-intel/SPARK-14958.
parent 30345c43
No related branches found
No related tags found
No related merge requests found
......@@ -143,8 +143,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
logError(
"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
case ex: Exception => // No-op
} finally {
// If there's an error while deserializing the TaskEndReason, this Runnable
// will die. Still tell the scheduler about the task failure, to avoid a hang
// where the scheduler thinks the task is still running.
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
})
} catch {
......
......@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
import java.io.File
import java.io.{File, ObjectInputStream}
import java.net.URL
import java.nio.ByteBuffer
......@@ -248,5 +248,24 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
assert(resSizeAfter.exists(_.toString.toLong > 0L))
}
test("failed task is handled when error occurs deserializing the reason") {
sc = new SparkContext("local", "test", conf)
val rdd = sc.parallelize(Seq(1), 1).map { _ =>
throw new UndeserializableException
}
val message = intercept[SparkException] {
rdd.collect()
}.getMessage
// Job failed, even though the failure reason is unknown.
val unknownFailure = """(?s).*Lost task.*: UnknownReason.*""".r
assert(unknownFailure.findFirstMatchIn(message).isDefined)
}
}
private class UndeserializableException extends Exception {
private def readObject(in: ObjectInputStream): Unit = {
throw new NoClassDefFoundError()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment