Skip to content
Snippets Groups Projects
Commit 61d4c07f authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-10640] History server fails to parse TaskCommitDenied

... simply because the code is missing!

Author: Andrew Or <andrew@databricks.com>

Closes #8828 from andrewor14/task-end-reason-json.
parent a96ba40f
No related branches found
No related tags found
No related merge requests found
......@@ -17,13 +17,17 @@
package org.apache.spark
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import java.io.{ObjectInputStream, ObjectOutputStream}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils
// ==============================================================================================
// NOTE: new task end reasons MUST be accompanied with serialization logic in util.JsonProtocol!
// ==============================================================================================
/**
* :: DeveloperApi ::
* Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry
......
......@@ -362,6 +362,10 @@ private[spark] object JsonProtocol {
("Stack Trace" -> stackTrace) ~
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
("Metrics" -> metrics)
case taskCommitDenied: TaskCommitDenied =>
("Job ID" -> taskCommitDenied.jobID) ~
("Partition ID" -> taskCommitDenied.partitionID) ~
("Attempt Number" -> taskCommitDenied.attemptNumber)
case ExecutorLostFailure(executorId, isNormalExit) =>
("Executor ID" -> executorId) ~
("Normal Exit" -> isNormalExit)
......@@ -770,6 +774,7 @@ private[spark] object JsonProtocol {
val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure)
val taskResultLost = Utils.getFormattedClassName(TaskResultLost)
val taskKilled = Utils.getFormattedClassName(TaskKilled)
val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
val unknownReason = Utils.getFormattedClassName(UnknownReason)
......@@ -794,6 +799,14 @@ private[spark] object JsonProtocol {
ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics, None)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `taskCommitDenied` =>
// Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON
// de/serialization logic was not added until 1.5.1. To provide backward compatibility
// for reading those logs, we need to provide default values for all the fields.
val jobId = Utils.jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
val partitionId = Utils.jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
TaskCommitDenied(jobId, partitionId, attemptNo)
case `executorLostFailure` =>
val isNormalExit = Utils.jsonOption(json \ "Normal Exit").
map(_.extract[Boolean])
......
......@@ -151,6 +151,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testTaskEndReason(exceptionFailure)
testTaskEndReason(TaskResultLost)
testTaskEndReason(TaskKilled)
testTaskEndReason(TaskCommitDenied(2, 3, 4))
testTaskEndReason(ExecutorLostFailure("100", true))
testTaskEndReason(UnknownReason)
......@@ -352,6 +353,17 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo))
}
// `TaskCommitDenied` was added in 1.3.0 but JSON de/serialization logic was added in 1.5.1
test("TaskCommitDenied backward compatibility") {
val denied = TaskCommitDenied(1, 2, 3)
val oldDenied = JsonProtocol.taskEndReasonToJson(denied)
.removeField({ _._1 == "Job ID" })
.removeField({ _._1 == "Partition ID" })
.removeField({ _._1 == "Attempt Number" })
val expectedDenied = TaskCommitDenied(-1, -1, -1)
assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
}
/** -------------------------- *
| Helper test running methods |
* --------------------------- */
......@@ -577,6 +589,11 @@ class JsonProtocolSuite extends SparkFunSuite {
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1),
TaskCommitDenied(jobId2, partitionId2, attemptNumber2)) =>
assert(jobId1 === jobId2)
assert(partitionId1 === partitionId2)
assert(attemptNumber1 === attemptNumber2)
case (ExecutorLostFailure(execId1, isNormalExit1),
ExecutorLostFailure(execId2, isNormalExit2)) =>
assert(execId1 === execId2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment