Skip to content
Snippets Groups Projects
Commit d45a0d3c authored by Carson Wang's avatar Carson Wang Committed by Reynold Xin
Browse files

[SPARK-11047] Internal accumulators miss the internal flag when replaying...

[SPARK-11047] Internal accumulators miss the internal flag when replaying events in the history server

Internal accumulators don't write the internal flag to event log. So on the history server Web UI, all accumulators are not internal. This causes incorrect peak execution memory and unwanted accumulator table displayed on the stage page.
To fix it, I add the "internal" property of AccumulableInfo when writing the event log.

Author: Carson Wang <carson.wang@intel.com>

Closes #9061 from carsonwang/accumulableBug.
parent 523adc24
No related branches found
No related tags found
No related merge requests found
......@@ -46,6 +46,15 @@ class AccumulableInfo private[spark] (
}
object AccumulableInfo {
def apply(
id: Long,
name: String,
update: Option[String],
value: String,
internal: Boolean): AccumulableInfo = {
new AccumulableInfo(id, name, update, value, internal)
}
def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = {
new AccumulableInfo(id, name, update, value, internal = false)
}
......
......@@ -282,7 +282,8 @@ private[spark] object JsonProtocol {
("ID" -> accumulableInfo.id) ~
("Name" -> accumulableInfo.name) ~
("Update" -> accumulableInfo.update.map(new JString(_)).getOrElse(JNothing)) ~
("Value" -> accumulableInfo.value)
("Value" -> accumulableInfo.value) ~
("Internal" -> accumulableInfo.internal)
}
def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
......@@ -696,7 +697,8 @@ private[spark] object JsonProtocol {
val name = (json \ "Name").extract[String]
val update = Utils.jsonOption(json \ "Update").map(_.extract[String])
val value = (json \ "Value").extract[String]
AccumulableInfo(id, name, update, value)
val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
AccumulableInfo(id, name, update, value, internal)
}
def taskMetricsFromJson(json: JValue): TaskMetrics = {
......
......@@ -364,6 +364,15 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
}
test("AccumulableInfo backward compatibility") {
// "Internal" property of AccumulableInfo were added after 1.5.1.
val accumulableInfo = makeAccumulableInfo(1)
val oldJson = JsonProtocol.accumulableInfoToJson(accumulableInfo)
.removeField({ _._1 == "Internal" })
val oldInfo = JsonProtocol.accumulableInfoFromJson(oldJson)
assert(false === oldInfo.internal)
}
/** -------------------------- *
| Helper test running methods |
* --------------------------- */
......@@ -723,15 +732,15 @@ class JsonProtocolSuite extends SparkFunSuite {
val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL,
speculative)
val (acc1, acc2, acc3) =
(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3))
(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true))
taskInfo.accumulables += acc1
taskInfo.accumulables += acc2
taskInfo.accumulables += acc3
taskInfo
}
private def makeAccumulableInfo(id: Int): AccumulableInfo =
AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id)
private def makeAccumulableInfo(id: Int, internal: Boolean = false): AccumulableInfo =
AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id, internal)
/**
* Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
......@@ -812,13 +821,15 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2"
| "Value": "val2",
| "Internal": false
| },
| {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1"
| "Value": "val1",
| "Internal": false
| }
| ]
| },
......@@ -866,13 +877,15 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2"
| "Value": "val2",
| "Internal": false
| },
| {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1"
| "Value": "val1",
| "Internal": false
| }
| ]
| }
......@@ -902,19 +915,22 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1"
| "Value": "val1",
| "Internal": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2"
| "Value": "val2",
| "Internal": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3"
| "Value": "val3",
| "Internal": true
| }
| ]
| }
......@@ -942,19 +958,22 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1"
| "Value": "val1",
| "Internal": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2"
| "Value": "val2",
| "Internal": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3"
| "Value": "val3",
| "Internal": true
| }
| ]
| }
......@@ -988,19 +1007,22 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1"
| "Value": "val1",
| "Internal": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2"
| "Value": "val2",
| "Internal": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3"
| "Value": "val3",
| "Internal": true
| }
| ]
| },
......@@ -1074,19 +1096,22 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1"
| "Value": "val1",
| "Internal": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2"
| "Value": "val2",
| "Internal": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3"
| "Value": "val3",
| "Internal": true
| }
| ]
| },
......@@ -1157,19 +1182,22 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 1,
| "Name": "Accumulable1",
| "Update": "delta1",
| "Value": "val1"
| "Value": "val1",
| "Internal": false
| },
| {
| "ID": 2,
| "Name": "Accumulable2",
| "Update": "delta2",
| "Value": "val2"
| "Value": "val2",
| "Internal": false
| },
| {
| "ID": 3,
| "Name": "Accumulable3",
| "Update": "delta3",
| "Value": "val3"
| "Value": "val3",
| "Internal": true
| }
| ]
| },
......@@ -1251,13 +1279,15 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 2,
| "Name": " Accumulable 2",
| "Update": "delta2",
| "Value": "val2"
| "Value": "val2",
| "Internal": false
| },
| {
| "ID": 1,
| "Name": " Accumulable 1",
| "Update": "delta1",
| "Value": "val1"
| "Value": "val1",
| "Internal": false
| }
| ]
| },
......@@ -1309,13 +1339,15 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 2,
| "Name": " Accumulable 2",
| "Update": "delta2",
| "Value": "val2"
| "Value": "val2",
| "Internal": false
| },
| {
| "ID": 1,
| "Name": " Accumulable 1",
| "Update": "delta1",
| "Value": "val1"
| "Value": "val1",
| "Internal": false
| }
| ]
| },
......@@ -1384,13 +1416,15 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 2,
| "Name": " Accumulable 2",
| "Update": "delta2",
| "Value": "val2"
| "Value": "val2",
| "Internal": false
| },
| {
| "ID": 1,
| "Name": " Accumulable 1",
| "Update": "delta1",
| "Value": "val1"
| "Value": "val1",
| "Internal": false
| }
| ]
| },
......@@ -1476,13 +1510,15 @@ class JsonProtocolSuite extends SparkFunSuite {
| "ID": 2,
| "Name": " Accumulable 2",
| "Update": "delta2",
| "Value": "val2"
| "Value": "val2",
| "Internal": false
| },
| {
| "ID": 1,
| "Name": " Accumulable 1",
| "Update": "delta1",
| "Value": "val1"
| "Value": "val1",
| "Internal": false
| }
| ]
| }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment