Skip to content
Snippets Groups Projects
Commit 3034fc0d authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge commit 'ad4ebff4'

parents 6a650cbb ad4ebff4
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ case class CompletionEvent(task: DAGTask[_], reason: TaskEndReason, result: Any,
sealed trait TaskEndReason
case object Success extends TaskEndReason
case class FetchFailed(serverUri: String, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
case class ExceptionFailure(exception: Throwable) extends TaskEndReason
case class OtherFailure(message: String) extends TaskEndReason
/**
......
......@@ -87,6 +87,13 @@ class Executor extends org.apache.mesos.Executor with Logging {
.build())
}
case t: Throwable => {
val reason = ExceptionFailure(t)
d.sendStatusUpdate(TaskStatus.newBuilder()
.setTaskId(desc.getTaskId)
.setState(TaskState.TASK_FAILED)
.setData(ByteString.copyFrom(Utils.serialize(reason)))
.build())
// TODO: Handle errors in tasks less dramatically
logError("Exception in task ID " + tid, t)
System.exit(1)
......
......@@ -60,6 +60,15 @@ extends Job(jobId) with Logging
var failed = false
var causeOfFailure = ""
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL = System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
// Map of recent exceptions (identified by string representation and
// top stack frame) to duplicate count (how many times the same
// exception has appeared) and time the full exception was
// printed. This should ideally be an LRU map that can drop old
// exceptions automatically.
val recentExceptions = HashMap[String, (Int, Long)]()
// Add all our tasks to the pending lists. We do this in reverse order
// of task index so that tasks with low indices get launched first.
for (i <- (0 until numTasks).reverse) {
......@@ -229,6 +238,34 @@ extends Job(jobId) with Logging
if (tasksFinished == numTasks)
sched.jobFinished(this)
return
case ef: ExceptionFailure =>
val key = ef.exception.toString
val now = System.currentTimeMillis
val (printFull, dupCount) =
if (recentExceptions.contains(key)) {
val (dupCount, printTime) = recentExceptions(key)
if (now - printTime > EXCEPTION_PRINT_INTERVAL) {
recentExceptions(key) = (0, now)
(true, 0)
} else {
recentExceptions(key) = (dupCount + 1, printTime)
(false, dupCount + 1)
}
} else {
recentExceptions += Tuple(key, (0, now))
(true, 0)
}
if (printFull) {
val stackTrace =
for (elem <- ef.exception.getStackTrace)
yield "\tat %s".format(elem.toString)
logInfo("Loss was due to %s\n%s".format(
ef.exception.toString, stackTrace.mkString("\n")))
} else {
logInfo("Loss was due to %s [duplicate %d]".format(
ef.exception.toString, dupCount))
}
case _ => {}
}
}
......
package spark.examples
import spark.SparkContext
object ExceptionHandlingTest {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: ExceptionHandlingTest <host>")
System.exit(1)
}
val sc = new SparkContext(args(0), "ExceptionHandlingTest")
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
if (Math.random > 0.75)
throw new Exception("Testing exception handling")
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment