Skip to content
Snippets Groups Projects
Commit bd2c12fb authored by Rajesh Balamohan's avatar Rajesh Balamohan Committed by Marcelo Vanzin
Browse files

[SPARK-12920][CORE] Honor "spark.ui.retainedStages" to reduce mem-pressure

When large number of jobs are run concurrently with Spark thrift server, thrift server starts running at high CPU due to GC pressure. Job UI retention causes memory pressure with large jobs. https://issues.apache.org/jira/secure/attachment/12783302/SPARK-12920.profiler_job_progress_listner.png has the profiler snapshot. This PR honors `spark.ui.retainedStages` strictly to reduce memory pressure.

Manual and unit tests

Author: Rajesh Balamohan <rbalamohan@apache.org>

Closes #10846 from rajeshbalamohan/SPARK-12920.
parent bf5cb8af
No related branches found
No related tags found
No related merge requests found
......@@ -140,7 +140,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
/** If stages is too large, remove and garbage collect old stages */
private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
val toRemove = (stages.size - retainedStages)
stages.take(toRemove).foreach { s =>
stageIdToData.remove((s.stageId, s.attemptId))
stageIdToInfo.remove(s.stageId)
......@@ -152,7 +152,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
/** If jobs is too large, remove and garbage collect old jobs */
private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
if (jobs.size > retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
val toRemove = (jobs.size - retainedJobs)
jobs.take(toRemove).foreach { job =>
// Remove the job's UI data, if it exists
jobIdToData.remove(job.jobId).foreach { removedJob =>
......
......@@ -84,18 +84,27 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
}
test("test LRU eviction of stages") {
def runWithListener(listener: JobProgressListener) : Unit = {
for (i <- 1 to 50) {
listener.onStageSubmitted(createStageStartEvent(i))
listener.onStageCompleted(createStageEndEvent(i))
}
assertActiveJobsStateIsEmpty(listener)
}
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
val listener = new JobProgressListener(conf)
for (i <- 1 to 50) {
listener.onStageSubmitted(createStageStartEvent(i))
listener.onStageCompleted(createStageEndEvent(i))
}
assertActiveJobsStateIsEmpty(listener)
var listener = new JobProgressListener(conf)
// Test with 5 retainedStages
runWithListener(listener)
listener.completedStages.size should be (5)
listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
// Test with 0 retainedStages
conf.set("spark.ui.retainedStages", 0.toString)
listener = new JobProgressListener(conf)
runWithListener(listener)
listener.completedStages.size should be (0)
}
test("test clearing of stageIdToActiveJobs") {
......@@ -121,20 +130,29 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
}
test("test clearing of jobGroupToJobIds") {
def runWithListener(listener: JobProgressListener): Unit = {
// Run 50 jobs, each with one stage
for (jobId <- 0 to 50) {
listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
listener.onStageSubmitted(createStageStartEvent(0))
listener.onStageCompleted(createStageEndEvent(0, failed = false))
listener.onJobEnd(createJobEndEvent(jobId, false))
}
assertActiveJobsStateIsEmpty(listener)
}
val conf = new SparkConf()
conf.set("spark.ui.retainedJobs", 5.toString)
val listener = new JobProgressListener(conf)
// Run 50 jobs, each with one stage
for (jobId <- 0 to 50) {
listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
listener.onStageSubmitted(createStageStartEvent(0))
listener.onStageCompleted(createStageEndEvent(0, failed = false))
listener.onJobEnd(createJobEndEvent(jobId, false))
}
assertActiveJobsStateIsEmpty(listener)
var listener = new JobProgressListener(conf)
runWithListener(listener)
// This collection won't become empty, but it should be bounded by spark.ui.retainedJobs
listener.jobGroupToJobIds.size should be (5)
// Test with 0 jobs
conf.set("spark.ui.retainedJobs", 0.toString)
listener = new JobProgressListener(conf)
runWithListener(listener)
listener.jobGroupToJobIds.size should be (0)
}
test("test LRU eviction of jobs") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment