Skip to content
Snippets Groups Projects
Commit 30e0557d authored by Josh Rosen's avatar Josh Rosen Committed by Wenchen Fan
Browse files

[SPARK-20776] Fix perf. problems in JobProgressListener caused by TaskMetrics construction

## What changes were proposed in this pull request?

In

```
./bin/spark-shell --master=local[64]
```

I ran

```
sc.parallelize(1 to 100000, 100000).count()
```
and profiled the time spend in the LiveListenerBus event processing thread. I discovered that the majority of the time was being spent in `TaskMetrics.empty` calls in `JobProgressListener.onTaskStart`. It turns out that we can slightly refactor to remove the need to construct one empty instance per call, greatly improving the performance of this code.

The performance gains here help to avoid an issue where listener events would be dropped because the JobProgressListener couldn't keep up with the throughput.

**Before:**

![image](https://cloud.githubusercontent.com/assets/50748/26133095/95bcd42a-3a59-11e7-8051-a50550e447b8.png)

**After:**

![image](https://cloud.githubusercontent.com/assets/50748/26133070/7935e148-3a59-11e7-8c2d-73d5aa5a2397.png)

## How was this patch tested?

Benchmarks described above.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #18008 from JoshRosen/nametoaccums-improvements.
parent 7463a88b
No related branches found
No related tags found
No related merge requests found
......@@ -329,13 +329,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
val metrics = TaskMetrics.empty
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
stageData.numActiveTasks += 1
stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo, Some(metrics)))
stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo))
}
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
......@@ -405,7 +404,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info, None))
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info))
taskData.updateTaskInfo(info)
taskData.updateTaskMetrics(taskMetrics)
taskData.errorMessage = errorMessage
......
......@@ -112,9 +112,9 @@ private[spark] object UIData {
/**
* These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation.
*/
class TaskUIData private(
private var _taskInfo: TaskInfo,
private var _metrics: Option[TaskMetricsUIData]) {
class TaskUIData private(private var _taskInfo: TaskInfo) {
private[this] var _metrics: Option[TaskMetricsUIData] = Some(TaskMetricsUIData.EMPTY)
var errorMessage: Option[String] = None
......@@ -127,7 +127,7 @@ private[spark] object UIData {
}
def updateTaskMetrics(metrics: Option[TaskMetrics]): Unit = {
_metrics = TaskUIData.toTaskMetricsUIData(metrics)
_metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics)
}
def taskDuration: Option[Long] = {
......@@ -140,28 +140,8 @@ private[spark] object UIData {
}
object TaskUIData {
def apply(taskInfo: TaskInfo, metrics: Option[TaskMetrics]): TaskUIData = {
new TaskUIData(dropInternalAndSQLAccumulables(taskInfo), toTaskMetricsUIData(metrics))
}
private def toTaskMetricsUIData(metrics: Option[TaskMetrics]): Option[TaskMetricsUIData] = {
metrics.map { m =>
TaskMetricsUIData(
executorDeserializeTime = m.executorDeserializeTime,
executorDeserializeCpuTime = m.executorDeserializeCpuTime,
executorRunTime = m.executorRunTime,
executorCpuTime = m.executorCpuTime,
resultSize = m.resultSize,
jvmGCTime = m.jvmGCTime,
resultSerializationTime = m.resultSerializationTime,
memoryBytesSpilled = m.memoryBytesSpilled,
diskBytesSpilled = m.diskBytesSpilled,
peakExecutionMemory = m.peakExecutionMemory,
inputMetrics = InputMetricsUIData(m.inputMetrics),
outputMetrics = OutputMetricsUIData(m.outputMetrics),
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
}
def apply(taskInfo: TaskInfo): TaskUIData = {
new TaskUIData(dropInternalAndSQLAccumulables(taskInfo))
}
/**
......@@ -206,6 +186,28 @@ private[spark] object UIData {
shuffleReadMetrics: ShuffleReadMetricsUIData,
shuffleWriteMetrics: ShuffleWriteMetricsUIData)
object TaskMetricsUIData {
def fromTaskMetrics(m: TaskMetrics): TaskMetricsUIData = {
TaskMetricsUIData(
executorDeserializeTime = m.executorDeserializeTime,
executorDeserializeCpuTime = m.executorDeserializeCpuTime,
executorRunTime = m.executorRunTime,
executorCpuTime = m.executorCpuTime,
resultSize = m.resultSize,
jvmGCTime = m.jvmGCTime,
resultSerializationTime = m.resultSerializationTime,
memoryBytesSpilled = m.memoryBytesSpilled,
diskBytesSpilled = m.diskBytesSpilled,
peakExecutionMemory = m.peakExecutionMemory,
inputMetrics = InputMetricsUIData(m.inputMetrics),
outputMetrics = OutputMetricsUIData(m.outputMetrics),
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
}
val EMPTY: TaskMetricsUIData = fromTaskMetrics(TaskMetrics.empty)
}
case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
object InputMetricsUIData {
def apply(metrics: InputMetrics): InputMetricsUIData = {
......
......@@ -31,7 +31,7 @@ class AllStagesResourceSuite extends SparkFunSuite {
val tasks = new LinkedHashMap[Long, TaskUIData]
taskLaunchTimes.zipWithIndex.foreach { case (time, idx) =>
tasks(idx.toLong) = TaskUIData(
new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None)
new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false))
}
val stageUiData = new StageUIData()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment