Skip to content
Snippets Groups Projects
Commit f6e18388 authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-7608] Clean up old state in RDDOperationGraphListener

This is necessary for streaming and long-running Spark applications. zsxwing tdas

Author: Andrew Or <andrew@databricks.com>

Closes #6125 from andrewor14/viz-listener-leak and squashes the following commits:

8660949 [Andrew Or] Fix thing + add tests
33c0843 [Andrew Or] Clean up old job state
parent e683182c
No related branches found
No related tags found
No related merge requests found
...@@ -27,11 +27,16 @@ import org.apache.spark.ui.SparkUI ...@@ -27,11 +27,16 @@ import org.apache.spark.ui.SparkUI
* A SparkListener that constructs a DAG of RDD operations. * A SparkListener that constructs a DAG of RDD operations.
*/ */
private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener { private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener {
private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]] private[ui] val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
private val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph] private[ui] val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
private val stageIds = new mutable.ArrayBuffer[Int]
// Keep track of the order in which these are inserted so we can remove old ones
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
private[ui] val stageIds = new mutable.ArrayBuffer[Int]
// How many jobs or stages to retain graph metadata for // How many jobs or stages to retain graph metadata for
private val retainedJobs =
conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
private val retainedStages = private val retainedStages =
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
...@@ -50,15 +55,22 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen ...@@ -50,15 +55,22 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
/** On job start, construct a RDDOperationGraph for each stage in the job for display later. */ /** On job start, construct a RDDOperationGraph for each stage in the job for display later. */
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized { override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
val jobId = jobStart.jobId val jobId = jobStart.jobId
val stageInfos = jobStart.stageInfos jobIds += jobId
jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted
stageInfos.foreach { stageInfo => // Remove state for old jobs
stageIds += stageInfo.stageId if (jobIds.size >= retainedJobs) {
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo) val toRemove = math.max(retainedJobs / 10, 1)
jobIds.take(toRemove).foreach { id => jobIdToStageIds.remove(id) }
jobIds.trimStart(toRemove)
} }
jobIdToStageIds(jobId) = stageInfos.map(_.stageId).sorted }
// Remove graph metadata for old stages /** Remove graph metadata for old stages */
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val stageInfo = stageSubmitted.stageInfo
stageIds += stageInfo.stageId
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
if (stageIds.size >= retainedStages) { if (stageIds.size >= retainedStages) {
val toRemove = math.max(retainedStages / 10, 1) val toRemove = math.max(retainedStages / 10, 1)
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) } stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui.scope
import org.scalatest.FunSuite
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListenerStageSubmitted, StageInfo}
class RDDOperationGraphListenerSuite extends FunSuite {
private var jobIdCounter = 0
private var stageIdCounter = 0
/** Run a job with the specified number of stages. */
private def runOneJob(numStages: Int, listener: RDDOperationGraphListener): Unit = {
assert(numStages > 0, "I will not run a job with 0 stages for you.")
val stageInfos = (0 until numStages).map { _ =>
val stageInfo = new StageInfo(stageIdCounter, 0, "s", 0, Seq.empty, Seq.empty, "d")
listener.onStageSubmitted(new SparkListenerStageSubmitted(stageInfo))
stageIdCounter += 1
stageInfo
}
listener.onJobStart(new SparkListenerJobStart(jobIdCounter, 0, stageInfos))
jobIdCounter += 1
}
test("listener cleans up metadata") {
val conf = new SparkConf()
.set("spark.ui.retainedStages", "10")
.set("spark.ui.retainedJobs", "10")
val listener = new RDDOperationGraphListener(conf)
assert(listener.jobIdToStageIds.isEmpty)
assert(listener.stageIdToGraph.isEmpty)
assert(listener.jobIds.isEmpty)
assert(listener.stageIds.isEmpty)
// Run a few jobs, but not enough for clean up yet
runOneJob(1, listener)
runOneJob(2, listener)
runOneJob(3, listener)
assert(listener.jobIdToStageIds.size === 3)
assert(listener.stageIdToGraph.size === 6)
assert(listener.jobIds.size === 3)
assert(listener.stageIds.size === 6)
// Run a few more, but this time the stages should be cleaned up, but not the jobs
runOneJob(5, listener)
runOneJob(100, listener)
assert(listener.jobIdToStageIds.size === 5)
assert(listener.stageIdToGraph.size === 9)
assert(listener.jobIds.size === 5)
assert(listener.stageIds.size === 9)
// Run a few more, but this time both jobs and stages should be cleaned up
(1 to 100).foreach { _ =>
runOneJob(1, listener)
}
assert(listener.jobIdToStageIds.size === 9)
assert(listener.stageIdToGraph.size === 9)
assert(listener.jobIds.size === 9)
assert(listener.stageIds.size === 9)
// Ensure we clean up old jobs and stages, not arbitrary ones
assert(!listener.jobIdToStageIds.contains(0))
assert(!listener.stageIdToGraph.contains(0))
assert(!listener.stageIds.contains(0))
assert(!listener.jobIds.contains(0))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment