Skip to content
Snippets Groups Projects
Commit 059ab887 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Changing technique to use same code path in all cases

parent a5c28bb8
No related branches found
No related tags found
No related merge requests found
......@@ -14,9 +14,16 @@ private[spark] case object Success extends TaskEndReason
private[spark]
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
private[spark]
case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
private[spark] case class FetchFailed(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
extends TaskEndReason
private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
private[spark] case class ExceptionFailure(
description: String,
stackTrace: Array[StackTraceElement])
extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason
......@@ -122,20 +122,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
}
case t: Throwable => {
val reason = ExceptionFailure(t)
val serReason =
try {
ser.serialize(reason)
}
catch {
case e: NotSerializableException => {
val message = "Spark caught unserializable exn: " + t.toString
val throwable = new Exception(message)
throwable.setStackTrace(t.getStackTrace)
ser.serialize(new ExceptionFailure(throwable))
}
}
context.statusUpdate(taskId, TaskState.FAILED, serReason)
val reason = ExceptionFailure(t.toString, t.getStackTrace)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
// have left some weird state around depending on when the exception was thrown, but on
......
......@@ -493,7 +493,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return
case ef: ExceptionFailure =>
val key = ef.exception.toString
val key = ef.description
val now = System.currentTimeMillis
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
......@@ -511,10 +511,10 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
}
}
if (printFull) {
val locs = ef.exception.getStackTrace.map(loc => "\tat %s".format(loc.toString))
logInfo("Loss was due to %s\n%s".format(ef.exception.toString, locs.mkString("\n")))
val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
logInfo("Loss was due to %s\n%s".format(ef.description, locs.mkString("\n")))
} else {
logInfo("Loss was due to %s [duplicate %d]".format(ef.exception.toString, dupCount))
logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
}
case _ => {}
......
......@@ -102,7 +102,8 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
} else {
// TODO: Do something nicer here to return all the way to the user
if (!Thread.currentThread().isInterrupted)
listener.taskEnded(task, new ExceptionFailure(t), null, null, info, null)
listener.taskEnded(
task, new ExceptionFailure(t.getMessage, t.getStackTrace), null, null, info, null)
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment