Skip to content
Snippets Groups Projects
Commit b120fba6 authored by Lukasz's avatar Lukasz Committed by Shixiong Zhu
Browse files

[SPARK-9044] Fix "Storage" tab in UI so that it reflects RDD name change.

## What changes were proposed in this pull request?

1. Making 'name' field of RDDInfo mutable.
2. In StorageListener: catching the fact that RDD's name was changed and updating it in RDDInfo.

## How was this patch tested?

1. Manual verification - the 'Storage' tab now behaves as expected.
2. The commit also contains a new unit test which verifies this.

Author: Lukasz <lgieron@gmail.com>

Closes #13264 from lgieron/SPARK-9044.
parent 4f27b8dd
No related branches found
No related tags found
No related merge requests found
......@@ -24,7 +24,7 @@ import org.apache.spark.util.Utils
@DeveloperApi
class RDDInfo(
val id: Int,
val name: String,
var name: String,
val numPartitions: Int,
var storageLevel: StorageLevel,
val parentIds: Seq[Int],
......
......@@ -59,7 +59,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val rddInfos = stageSubmitted.stageInfo.rddInfos
rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) }
rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info).name = info.name }
}
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
......
......@@ -179,6 +179,23 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
assert(storageListener.rddInfoList.size === 2)
}
test("verify StorageTab still contains a renamed RDD") {
val rddInfo = new RDDInfo(0, "original_name", 1, memOnly, Seq(4))
val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo), Seq.empty, "details")
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
val blockUpdateInfos1 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(0, 1), memOnly, 100L, 0L))
postUpdateBlocks(bus, blockUpdateInfos1)
assert(storageListener.rddInfoList.size == 1)
val newName = "new_name"
val rddInfoRenamed = new RDDInfo(0, newName, 1, memOnly, Seq(4))
val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfoRenamed), Seq.empty, "details")
bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
assert(storageListener.rddInfoList.size == 1)
assert(storageListener.rddInfoList.head.name == newName)
}
private def postUpdateBlocks(
bus: SparkListenerBus, blockUpdateInfos: Seq[BlockUpdatedInfo]): Unit = {
blockUpdateInfos.foreach { blockUpdateInfo =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment