diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala index 420c54bc9a0d4b56f0b8c466f77de0a15a0f8be9..ca793eb4021a64dccddfb73c6ada2d5feaeeed59 100644 --- a/core/src/main/scala/spark/TaskEndReason.scala +++ b/core/src/main/scala/spark/TaskEndReason.scala @@ -14,9 +14,17 @@ private[spark] case object Success extends TaskEndReason private[spark] case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it -private[spark] -case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason +private[spark] case class FetchFailed( + bmAddress: BlockManagerId, + shuffleId: Int, + mapId: Int, + reduceId: Int) + extends TaskEndReason -private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason +private[spark] case class ExceptionFailure( + className: String, + description: String, + stackTrace: Array[StackTraceElement]) + extends TaskEndReason private[spark] case class OtherFailure(message: String) extends TaskEndReason diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 344face5e698689c301dade2b026ed8f0186aa5f..da20b8454441c737c34cd6c561702bc45db367e2 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -122,7 +122,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert } case t: Throwable => { - val reason = ExceptionFailure(t) + val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace) context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) // TODO: Should we exit the whole executor here? On the one hand, the failed task may diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 27e713e2c43dd5d0629f37cf22cb46d47a012ed1..06de3c755eeeeea7e87ea9077031afbe6e810f0c 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -493,7 +493,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return case ef: ExceptionFailure => - val key = ef.exception.toString + val key = ef.description val now = System.currentTimeMillis val (printFull, dupCount) = { if (recentExceptions.contains(key)) { @@ -511,10 +511,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe } } if (printFull) { - val locs = ef.exception.getStackTrace.map(loc => "\tat %s".format(loc.toString)) - logInfo("Loss was due to %s\n%s".format(ef.exception.toString, locs.mkString("\n"))) + val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString)) + logInfo("Loss was due to %s\n%s\n%s".format( + ef.className, ef.description, locs.mkString("\n"))) } else { - logInfo("Loss was due to %s [duplicate %d]".format(ef.exception.toString, dupCount)) + logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) } case _ => {} diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index f060a940a9269bc2e5dfd8a57111fcbe2e719c7a..ebe42685ad01628149df0ac05a54f2518765a2cd 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -101,8 +101,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon submitTask(task, idInJob) } else { // TODO: Do something nicer here to return all the way to the user - if (!Thread.currentThread().isInterrupted) - listener.taskEnded(task, new ExceptionFailure(t), null, null, info, null) + if (!Thread.currentThread().isInterrupted) { + val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace) + listener.taskEnded(task, failure, null, null, info, null) + } } } } diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 4df3bb5b6793b10af3bdea91c9083b192e99e5c6..33c99471c69d788379050690c50408a3412ef988 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -18,6 +18,9 @@ import scala.collection.mutable.ArrayBuffer import SparkContext._ import storage.{GetBlock, BlockManagerWorker, StorageLevel} +class NotSerializableClass +class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {} + class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext { val clusterUrl = "local-cluster[2,1,512]" @@ -27,6 +30,24 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter System.clearProperty("spark.storage.memoryFraction") } + test("task throws not serializable exception") { + // Ensures that executors do not crash when an exn is not serializable. If executors crash, + // this test will hang. Correct behavior is that executors don't crash but fail tasks + // and the scheduler throws a SparkException. + + // numSlaves must be less than numPartitions + val numSlaves = 3 + val numPartitions = 10 + + sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test") + val data = sc.parallelize(1 to 100, numPartitions). + map(x => throw new NotSerializableExn(new NotSerializableClass)) + intercept[SparkException] { + data.count() + } + resetSparkContext() + } + test("local-cluster format") { sc = new SparkContext("local-cluster[2,1,512]", "test") assert(sc.parallelize(1 to 2, 2).count() == 2)