Skip to content
Snippets Groups Projects
Commit 5dbc9b20 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #608 from pwendell/SPARK-738

SPARK-738: Spark should detect and wrap nonserializable exceptions
parents 63e1999f 7f083364
No related branches found
No related tags found
No related merge requests found
......@@ -14,9 +14,17 @@ private[spark] case object Success extends TaskEndReason
private[spark]
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
private[spark]
case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
private[spark] case class FetchFailed(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
extends TaskEndReason
private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
private[spark] case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement])
extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason
......@@ -122,7 +122,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
}
case t: Throwable => {
val reason = ExceptionFailure(t)
val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
......
......@@ -493,7 +493,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return
case ef: ExceptionFailure =>
val key = ef.exception.toString
val key = ef.description
val now = System.currentTimeMillis
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
......@@ -511,10 +511,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
}
}
if (printFull) {
val locs = ef.exception.getStackTrace.map(loc => "\tat %s".format(loc.toString))
logInfo("Loss was due to %s\n%s".format(ef.exception.toString, locs.mkString("\n")))
val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
logInfo("Loss was due to %s\n%s\n%s".format(
ef.className, ef.description, locs.mkString("\n")))
} else {
logInfo("Loss was due to %s [duplicate %d]".format(ef.exception.toString, dupCount))
logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
}
case _ => {}
......
......@@ -101,8 +101,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
submitTask(task, idInJob)
} else {
// TODO: Do something nicer here to return all the way to the user
if (!Thread.currentThread().isInterrupted)
listener.taskEnded(task, new ExceptionFailure(t), null, null, info, null)
if (!Thread.currentThread().isInterrupted) {
val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
listener.taskEnded(task, failure, null, null, info, null)
}
}
}
}
......
......@@ -18,6 +18,9 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
import storage.{GetBlock, BlockManagerWorker, StorageLevel}
class NotSerializableClass
class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext {
val clusterUrl = "local-cluster[2,1,512]"
......@@ -27,6 +30,24 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
System.clearProperty("spark.storage.memoryFraction")
}
test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
// this test will hang. Correct behavior is that executors don't crash but fail tasks
// and the scheduler throws a SparkException.
// numSlaves must be less than numPartitions
val numSlaves = 3
val numPartitions = 10
sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
val data = sc.parallelize(1 to 100, numPartitions).
map(x => throw new NotSerializableExn(new NotSerializableClass))
intercept[SparkException] {
data.count()
}
resetSparkContext()
}
test("local-cluster format") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment