Skip to content
Snippets Groups Projects
Commit cc87280f authored by cenyuhai's avatar cenyuhai Committed by Sean Owen
Browse files

[SPARK-17171][WEB UI] DAG will list all partitions in the graph

## What changes were proposed in this pull request?
DAG will list all partitions in the graph, it is too slow and hard to see all graph.
Always we don't want to see all partitions,we just want to see the relations of DAG graph.
So I just show 2 root nodes for Rdds.

Before this PR, the DAG graph looks like [dag1.png](https://issues.apache.org/jira/secure/attachment/12824702/dag1.png), [dag3.png](https://issues.apache.org/jira/secure/attachment/12825456/dag3.png), after this PR, the DAG graph looks like [dag2.png](https://issues.apache.org/jira/secure/attachment/12824703/dag2.png),[dag4.png](https://issues.apache.org/jira/secure/attachment/12825457/dag4.png)

Author: cenyuhai <cenyuhai@didichuxing.com>
Author: 岑玉海 <261810726@qq.com>

Closes #14737 from cenyuhai/SPARK-17171.
parent 72eec70b
No related branches found
No related tags found
No related merge requests found
......@@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.{RDDInfo, StorageLevel}
/**
* A representation of a generic cluster graph used for storing information on RDD operations.
......@@ -107,7 +107,7 @@ private[ui] object RDDOperationGraph extends Logging {
* supporting in the future if we decide to group certain stages within the same job under
* a common scope (e.g. part of a SQL query).
*/
def makeOperationGraph(stage: StageInfo): RDDOperationGraph = {
def makeOperationGraph(stage: StageInfo, retainedNodes: Int): RDDOperationGraph = {
val edges = new ListBuffer[RDDOperationEdge]
val nodes = new mutable.HashMap[Int, RDDOperationNode]
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID
......@@ -119,18 +119,37 @@ private[ui] object RDDOperationGraph extends Logging {
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
var rootNodeCount = 0
val addRDDIds = new mutable.HashSet[Int]()
val dropRDDIds = new mutable.HashSet[Int]()
// Find nodes, edges, and operation scopes that belong to this stage
stage.rddInfos.foreach { rdd =>
edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
stage.rddInfos.sortBy(_.id).foreach { rdd =>
val parentIds = rdd.parentIds
val isAllowed =
if (parentIds.isEmpty) {
rootNodeCount += 1
rootNodeCount <= retainedNodes
} else {
parentIds.exists(id => addRDDIds.contains(id) || !dropRDDIds.contains(id))
}
if (isAllowed) {
addRDDIds += rdd.id
edges ++= parentIds.filter(id => !dropRDDIds.contains(id)).map(RDDOperationEdge(_, rdd.id))
} else {
dropRDDIds += rdd.id
}
// TODO: differentiate between the intention to cache an RDD and whether it's actually cached
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(
rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite))
if (rdd.scope.isEmpty) {
// This RDD has no encompassing scope, so we put it directly in the root cluster
// This should happen only if an RDD is instantiated outside of a public RDD API
rootCluster.attachChildNode(node)
if (isAllowed) {
rootCluster.attachChildNode(node)
}
} else {
// Otherwise, this RDD belongs to an inner cluster,
// which may be nested inside of other clusters
......@@ -154,7 +173,9 @@ private[ui] object RDDOperationGraph extends Logging {
rootCluster.attachChildCluster(cluster)
}
}
rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
if (isAllowed) {
rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
}
}
}
......
......@@ -41,6 +41,10 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
private[ui] val stageIds = new mutable.ArrayBuffer[Int]
// How many root nodes to retain in DAG Graph
private[ui] val retainedNodes =
conf.getInt("spark.ui.dagGraph.retainedRootRDDs", Int.MaxValue)
// How many jobs or stages to retain graph metadata for
private val retainedJobs =
conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
......@@ -82,7 +86,7 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
val stageId = stageInfo.stageId
stageIds += stageId
stageIdToJobId(stageId) = jobId
stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo, retainedNodes)
trimStagesIfNecessary()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment