Skip to content
Snippets Groups Projects
Commit 16186cdc authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-20955][CORE] Intern "executorId" to reduce the memory usage

## What changes were proposed in this pull request?

In [this line](https://github.com/apache/spark/blob/f7cf2096fdecb8edab61c8973c07c6fc877ee32d/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L128), it uses the `executorId` string received from executors and finally it will go into `TaskUIData`. As deserializing the `executorId` string will always create a new instance, we have a lot of duplicated string instances.

This PR does a String interning for TaskUIData to reduce the memory usage.

## How was this patch tested?

Manually test using `bin/spark-shell --master local-cluster[6,1,1024]`. Test codes:
```
for (_ <- 1 to 10) { sc.makeRDD(1 to 1000, 1000).count() }
Thread.sleep(2000)
val l = sc.getClass.getMethod("jobProgressListener").invoke(sc).asInstanceOf[org.apache.spark.ui.jobs.JobProgressListener]
org.apache.spark.util.SizeEstimator.estimate(l.stageIdToData)
```
This PR reduces the size of `stageIdToData` from 3487280 to 3009744 (86.3%) in the above case.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #18177 from zsxwing/SPARK-20955.
parent e11d90bf
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,8 @@ package org.apache.spark.ui.jobs ...@@ -20,6 +20,8 @@ package org.apache.spark.ui.jobs
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.{HashMap, LinkedHashMap} import scala.collection.mutable.{HashMap, LinkedHashMap}
import com.google.common.collect.Interners
import org.apache.spark.JobExecutionStatus import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor._ import org.apache.spark.executor._
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
...@@ -141,6 +143,14 @@ private[spark] object UIData { ...@@ -141,6 +143,14 @@ private[spark] object UIData {
} }
object TaskUIData { object TaskUIData {
private val stringInterner = Interners.newWeakInterner[String]()
/** String interning to reduce the memory usage. */
private def weakIntern(s: String): String = {
stringInterner.intern(s)
}
def apply(taskInfo: TaskInfo): TaskUIData = { def apply(taskInfo: TaskInfo): TaskUIData = {
new TaskUIData(dropInternalAndSQLAccumulables(taskInfo)) new TaskUIData(dropInternalAndSQLAccumulables(taskInfo))
} }
...@@ -155,8 +165,8 @@ private[spark] object UIData { ...@@ -155,8 +165,8 @@ private[spark] object UIData {
index = taskInfo.index, index = taskInfo.index,
attemptNumber = taskInfo.attemptNumber, attemptNumber = taskInfo.attemptNumber,
launchTime = taskInfo.launchTime, launchTime = taskInfo.launchTime,
executorId = taskInfo.executorId, executorId = weakIntern(taskInfo.executorId),
host = taskInfo.host, host = weakIntern(taskInfo.host),
taskLocality = taskInfo.taskLocality, taskLocality = taskInfo.taskLocality,
speculative = taskInfo.speculative speculative = taskInfo.speculative
) )
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment