diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala index 420c54bc9a0d4b56f0b8c466f77de0a15a0f8be9..ce9bb498974e7360f4d2559fee83d502183be53b 100644 --- a/core/src/main/scala/spark/TaskEndReason.scala +++ b/core/src/main/scala/spark/TaskEndReason.scala @@ -14,9 +14,16 @@ private[spark] case object Success extends TaskEndReason private[spark] case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it -private[spark] -case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason +private[spark] case class FetchFailed( + bmAddress: BlockManagerId, + shuffleId: Int, + mapId: Int, + reduceId: Int) + extends TaskEndReason -private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason +private[spark] case class ExceptionFailure( + description: String, + stackTrace: Array[StackTraceElement]) + extends TaskEndReason private[spark] case class OtherFailure(message: String) extends TaskEndReason diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index f9061b1c715ddd3b4a51e2f3cf05944dcb93eff5..9084def9b2629d8bc70760303ddc6de98bb3be83 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -122,20 +122,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert } case t: Throwable => { - val reason = ExceptionFailure(t) - val serReason = - try { - ser.serialize(reason) - } - catch { - case e: NotSerializableException => { - val message = "Spark caught unserializable exn: " + t.toString - val throwable = new Exception(message) - throwable.setStackTrace(t.getStackTrace) - ser.serialize(new ExceptionFailure(throwable)) - } - } - context.statusUpdate(taskId, TaskState.FAILED, serReason) + val reason = ExceptionFailure(t.toString, t.getStackTrace) + context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) // TODO: Should we exit the whole executor here? On the one hand, the failed task may // have left some weird state around depending on when the exception was thrown, but on diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 27e713e2c43dd5d0629f37cf22cb46d47a012ed1..6d663de2f85717930f0fec85628457269529ca56 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -493,7 +493,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return case ef: ExceptionFailure => - val key = ef.exception.toString + val key = ef.description val now = System.currentTimeMillis val (printFull, dupCount) = { if (recentExceptions.contains(key)) { @@ -511,10 +511,10 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe } } if (printFull) { - val locs = ef.exception.getStackTrace.map(loc => "\tat %s".format(loc.toString)) - logInfo("Loss was due to %s\n%s".format(ef.exception.toString, locs.mkString("\n"))) + val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString)) + logInfo("Loss was due to %s\n%s".format(ef.description, locs.mkString("\n"))) } else { - logInfo("Loss was due to %s [duplicate %d]".format(ef.exception.toString, dupCount)) + logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount)) } case _ => {} diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index f060a940a9269bc2e5dfd8a57111fcbe2e719c7a..42d5bc481306e0fc229a2462bc18156f8049aef5 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -102,7 +102,8 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon } else { // TODO: Do something nicer here to return all the way to the user if (!Thread.currentThread().isInterrupted) - listener.taskEnded(task, new ExceptionFailure(t), null, null, info, null) + listener.taskEnded( + task, new ExceptionFailure(t.getMessage, t.getStackTrace), null, null, info, null) } } }