diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index a3074916d13e7d40b6de0f4ef49b7dfa6b9d01d9..5e8bd8c8e533a55d637fdb706657a670c1dff9e9 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -30,27 +30,69 @@ import org.apache.spark.storage.BlockManagerId @DeveloperApi sealed trait TaskEndReason +/** + * :: DeveloperApi :: + * Task succeeded. + */ @DeveloperApi case object Success extends TaskEndReason +/** + * :: DeveloperApi :: + * Various possible reasons why a task failed. + */ +@DeveloperApi +sealed trait TaskFailedReason extends TaskEndReason { + /** Error message displayed in the web UI. */ + def toErrorString: String +} + +/** + * :: DeveloperApi :: + * A [[org.apache.spark.scheduler.ShuffleMapTask]] that completed successfully earlier, but we + * lost the executor before the stage completed. This means Spark needs to reschedule the task + * to be re-executed on a different executor. + */ @DeveloperApi -case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it +case object Resubmitted extends TaskFailedReason { + override def toErrorString: String = "Resubmitted (resubmitted due to lost executor)" +} +/** + * :: DeveloperApi :: + * Task failed to fetch shuffle data from a remote node. Probably means we have lost the remote + * executors the task is trying to fetch from, and thus need to rerun the previous stage. + */ @DeveloperApi case class FetchFailed( bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) - extends TaskEndReason + extends TaskFailedReason { + override def toErrorString: String = { + val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString + s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)" + } +} +/** + * :: DeveloperApi :: + * Task failed due to a runtime exception. This is the most common failure case and also captures + * user program exceptions. + */ @DeveloperApi case class ExceptionFailure( className: String, description: String, stackTrace: Array[StackTraceElement], metrics: Option[TaskMetrics]) - extends TaskEndReason + extends TaskFailedReason { + override def toErrorString: String = { + val stackTraceString = if (stackTrace == null) "null" else stackTrace.mkString("\n") + s"$className ($description}\n$stackTraceString" + } +} /** * :: DeveloperApi :: @@ -58,10 +100,18 @@ case class ExceptionFailure( * it was fetched. */ @DeveloperApi -case object TaskResultLost extends TaskEndReason +case object TaskResultLost extends TaskFailedReason { + override def toErrorString: String = "TaskResultLost (result lost from block manager)" +} +/** + * :: DeveloperApi :: + * Task was killed intentionally and needs to be rescheduled. + */ @DeveloperApi -case object TaskKilled extends TaskEndReason +case object TaskKilled extends TaskFailedReason { + override def toErrorString: String = "TaskKilled (killed intentionally)" +} /** * :: DeveloperApi :: @@ -69,7 +119,9 @@ case object TaskKilled extends TaskEndReason * the task crashed the JVM. */ @DeveloperApi -case object ExecutorLostFailure extends TaskEndReason +case object ExecutorLostFailure extends TaskFailedReason { + override def toErrorString: String = "ExecutorLostFailure (executor lost)" +} /** * :: DeveloperApi :: @@ -77,4 +129,6 @@ case object ExecutorLostFailure extends TaskEndReason * deserializing the task result. */ @DeveloperApi -case object UnknownReason extends TaskEndReason +case object UnknownReason extends TaskFailedReason { + override def toErrorString: String = "UnknownReason" +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b5bcdd7e99c583569f373aac8a2dc743a5208739..c0898f64fb0c99cef7f632046d046d000830c029 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -641,7 +641,9 @@ private[spark] class TaskSetManager( addPendingTask(index, readding=true) } - // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage + // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage. + // The reason is the next stage wouldn't be able to fetch the data from this dead executor + // so we would need to rerun these tasks on other executors. if (tasks(0).isInstanceOf[ShuffleMapTask]) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = taskInfos(tid).index diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index bfefe4dbc408927f0b12c7fad70f1d1556fd3d2c..381a5443df8b5773d595d0b03c265d9bbf56ccd3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui.jobs import scala.collection.mutable.{HashMap, ListBuffer} -import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, Success} +import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ @@ -51,6 +51,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { var totalShuffleRead = 0L var totalShuffleWrite = 0L + // TODO: Should probably consolidate all following into a single hash map. val stageIdToTime = HashMap[Int, Long]() val stageIdToShuffleRead = HashMap[Int, Long]() val stageIdToShuffleWrite = HashMap[Int, Long]() @@ -183,17 +184,17 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { // Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage tasksActive.remove(info.taskId) - val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = + val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) = taskEnd.reason match { case org.apache.spark.Success => stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1 (None, Option(taskEnd.taskMetrics)) - case e: ExceptionFailure => + case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 - (Some(e), e.metrics) - case e: org.apache.spark.TaskEndReason => + (Some(e.toErrorString), e.metrics) + case e: TaskFailedReason => // All other failure cases stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 - (None, None) + (Some(e.toErrorString), None) } stageIdToTime.getOrElseUpdate(sid, 0L) @@ -221,7 +222,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { stageIdToDiskBytesSpilled(sid) += diskBytesSpilled val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]()) - taskMap(info.taskId) = new TaskUIData(info, metrics, failureInfo) + taskMap(info.taskId) = new TaskUIData(info, metrics, errorMessage) stageIdToTaskData(sid) = taskMap } } @@ -256,7 +257,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { case class TaskUIData( taskInfo: TaskInfo, taskMetrics: Option[TaskMetrics] = None, - exception: Option[ExceptionFailure] = None) + errorMessage: Option[String] = None) private object JobProgressListener { val DEFAULT_POOL_NAME = "default" diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 4bce472036f7d38e71e1efbac437d2d2c605908d..8b65f0671bdb963125d15ffcb63b268166cc0bc8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -210,10 +210,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean) (taskData: TaskUIData): Seq[Node] = { - def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] = - trace.map(e => <span style="display:block;">{e.toString}</span>) - - taskData match { case TaskUIData(info, metrics, exception) => + taskData match { case TaskUIData(info, metrics, errorMessage) => val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) else metrics.map(_.executorRunTime).getOrElse(1L) val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration) @@ -283,12 +280,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { </td> }} <td> - {exception.map { e => - <span> - {e.className} ({e.description})<br/> - {fmtStackTrace(e.stackTrace)} - </span> - }.getOrElse("")} + {errorMessage.map { e => <pre>{e}</pre> }.getOrElse("")} </td> </tr> } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index a3f824a4e1f5723feb0d243175854441cb3796d7..30971f769682ff4a8e997d5832ba3237524eeb8a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -91,13 +91,13 @@ private[ui] class StageTableBase( {s.name} </a> - val details = if (s.details.nonEmpty) ( + val details = if (s.details.nonEmpty) { <span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')" class="expand-details"> +show details </span> <pre class="stage-details collapsed">{s.details}</pre> - ) + } listener.stageIdToDescription.get(s.stageId) .map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)