Skip to content
Snippets Groups Projects
Commit ad9e3d50 authored by jeanlyn's avatar jeanlyn Committed by Andrew Or
Browse files

[SPARK-13845][CORE] Using onBlockUpdated to replace onTaskEnd avioding driver OOM

## What changes were proposed in this pull request?

We have a streaming job using `FlumePollInputStream` always driver OOM after few days, here is some driver heap dump before OOM
```
 num     #instances         #bytes  class name
----------------------------------------------
   1:      13845916      553836640  org.apache.spark.storage.BlockStatus
   2:      14020324      336487776  org.apache.spark.storage.StreamBlockId
   3:      13883881      333213144  scala.collection.mutable.DefaultEntry
   4:          8907       89043952  [Lscala.collection.mutable.HashEntry;
   5:         62360       65107352  [B
   6:        163368       24453904  [Ljava.lang.Object;
   7:        293651       20342664  [C
...
```
`BlockStatus` and `StreamBlockId` keep on growing, and the driver OOM in the end.
After investigated, i found the `executorIdToStorageStatus` in `StorageStatusListener` seems never remove the blocks from `StorageStatus`.
In order to fix the issue, i try to use `onBlockUpdated` replace `onTaskEnd ` , so we can update the block informations(add blocks, drop the block from memory to disk and delete the blocks) in time.

## How was this patch tested?

Existing unit tests and manual tests

Author: jeanlyn <jeanlyn92@gmail.com>

Closes #11779 from jeanlyn/fix_driver_oom.
parent a916d2a4
No related branches found
No related tags found
No related merge requests found
......@@ -66,17 +66,6 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
val metrics = taskEnd.taskMetrics
if (info != null && metrics != null) {
val updatedBlocks = metrics.updatedBlockStatuses
if (updatedBlocks.length > 0) {
updateStorageStatus(info.executorId, updatedBlocks)
}
}
}
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
updateStorageStatus(unpersistRDD.rddId)
}
......@@ -102,4 +91,14 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
}
}
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
val executorId = blockUpdated.blockUpdatedInfo.blockManagerId.executorId
val blockId = blockUpdated.blockUpdatedInfo.blockId
val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
val memSize = blockUpdated.blockUpdatedInfo.memSize
val diskSize = blockUpdated.blockUpdatedInfo.diskSize
val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
updateStorageStatus(executorId, Seq((blockId, blockStatus)))
}
}
......@@ -57,17 +57,6 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList)
}
/**
* Assumes the storage status list is fully up-to-date. This implies the corresponding
* StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener.
*/
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val metrics = taskEnd.taskMetrics
if (metrics != null && metrics.updatedBlockStatuses.nonEmpty) {
updateRDDInfo(metrics.updatedBlockStatuses)
}
}
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val rddInfos = stageSubmitted.stageInfo.rddInfos
rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) }
......@@ -84,4 +73,14 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
_rddInfoMap.remove(unpersistRDD.rddId)
}
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
super.onBlockUpdated(blockUpdated)
val blockId = blockUpdated.blockUpdatedInfo.blockId
val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
val memSize = blockUpdated.blockUpdatedInfo.memSize
val diskSize = blockUpdated.blockUpdatedInfo.diskSize
val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
updateRDDInfo(Seq((blockId, blockStatus)))
}
}
......@@ -2,8 +2,8 @@
"id" : "<driver>",
"hostPort" : "localhost:57971",
"isActive" : true,
"rddBlocks" : 8,
"memoryUsed" : 28000128,
"rddBlocks" : 0,
"memoryUsed" : 0,
"diskUsed" : 0,
"totalCores" : 0,
"maxTasks" : 0,
......
[ {
"id" : 0,
"name" : "0",
"numPartitions" : 8,
"numCachedPartitions" : 8,
"storageLevel" : "Memory Deserialized 1x Replicated",
"memoryUsed" : 28000128,
"diskUsed" : 0
} ]
\ No newline at end of file
[ ]
\ No newline at end of file
......@@ -140,8 +140,9 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
"stage task list from multi-attempt app json(2)" ->
"applications/local-1426533911241/2/stages/0/0/taskList",
"rdd list storage json" -> "applications/local-1422981780767/storage/rdd",
"one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
"rdd list storage json" -> "applications/local-1422981780767/storage/rdd"
// Todo: enable this test when logging the even of onBlockUpdated. See: SPARK-13845
// "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
)
// run a bunch of characterization tests -- just verify the behavior is the same as what is saved
......
......@@ -82,48 +82,51 @@ class StorageStatusListenerSuite extends SparkFunSuite {
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
}
test("task end with updated blocks") {
test("updated blocks") {
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L))
val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L))
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L))
taskMetrics1.setUpdatedBlockStatuses(Seq(block1, block2))
taskMetrics2.setUpdatedBlockStatuses(Seq(block3))
// Task end with new blocks
val blockUpdateInfos1 = Seq(
BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0L, 100L),
BlockUpdatedInfo(bm1, RDDBlockId(1, 2), StorageLevel.DISK_ONLY, 0L, 200L)
)
val blockUpdateInfos2 =
Seq(BlockUpdatedInfo(bm2, RDDBlockId(4, 0), StorageLevel.DISK_ONLY, 0L, 300L))
// Add some new blocks
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
postUpdateBlock(listener, blockUpdateInfos1)
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2))
postUpdateBlock(listener, blockUpdateInfos2)
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
// Task end with dropped blocks
val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L))
val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L))
val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L))
taskMetrics1.setUpdatedBlockStatuses(Seq(droppedBlock1, droppedBlock3))
taskMetrics2.setUpdatedBlockStatuses(Seq(droppedBlock2, droppedBlock3))
// Dropped the blocks
val droppedBlockInfo1 = Seq(
BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.NONE, 0L, 0L),
BlockUpdatedInfo(bm1, RDDBlockId(4, 0), StorageLevel.NONE, 0L, 0L)
)
val droppedBlockInfo2 = Seq(
BlockUpdatedInfo(bm2, RDDBlockId(1, 2), StorageLevel.NONE, 0L, 0L),
BlockUpdatedInfo(bm2, RDDBlockId(4, 0), StorageLevel.NONE, 0L, 0L)
)
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
postUpdateBlock(listener, droppedBlockInfo1)
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2))
postUpdateBlock(listener, droppedBlockInfo2)
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
......@@ -134,15 +137,14 @@ class StorageStatusListenerSuite extends SparkFunSuite {
test("unpersist RDD") {
val listener = new StorageStatusListener(conf)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L))
val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L))
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L))
taskMetrics1.setUpdatedBlockStatuses(Seq(block1, block2))
taskMetrics2.setUpdatedBlockStatuses(Seq(block3))
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics2))
val blockUpdateInfos1 = Seq(
BlockUpdatedInfo(bm1, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0L, 100L),
BlockUpdatedInfo(bm1, RDDBlockId(1, 2), StorageLevel.DISK_ONLY, 0L, 200L)
)
val blockUpdateInfos2 =
Seq(BlockUpdatedInfo(bm1, RDDBlockId(4, 0), StorageLevel.DISK_ONLY, 0L, 300L))
postUpdateBlock(listener, blockUpdateInfos1)
postUpdateBlock(listener, blockUpdateInfos2)
assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
// Unpersist RDD
......@@ -155,4 +157,11 @@ class StorageStatusListenerSuite extends SparkFunSuite {
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
}
private def postUpdateBlock(
listener: StorageStatusListener, updateBlockInfos: Seq[BlockUpdatedInfo]): Unit = {
updateBlockInfos.foreach { updateBlockInfo =>
listener.onBlockUpdated(SparkListenerBlockUpdated(updateBlockInfo))
}
}
}
......@@ -106,7 +106,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
assert(storageListener.rddInfoList.size === 0)
}
test("task end") {
test("block update") {
val myRddInfo0 = rddInfo0
val myRddInfo1 = rddInfo1
val myRddInfo2 = rddInfo2
......@@ -120,19 +120,13 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
assert(!storageListener._rddInfoMap(1).isCached)
assert(!storageListener._rddInfoMap(2).isCached)
// Task end with no updated blocks. This should not change anything.
bus.postToAll(SparkListenerTaskEnd(0, 0, "obliteration", Success, taskInfo, new TaskMetrics))
assert(storageListener._rddInfoMap.size === 3)
assert(storageListener.rddInfoList.size === 0)
// Task end with a few new persisted blocks, some from the same RDD
val metrics1 = new TaskMetrics
metrics1.setUpdatedBlockStatuses(Seq(
(RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L)),
(RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L)),
(RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L))
))
bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1))
// Some blocks updated
val blockUpdateInfos = Seq(
BlockUpdatedInfo(bm1, RDDBlockId(0, 100), memAndDisk, 400L, 0L),
BlockUpdatedInfo(bm1, RDDBlockId(0, 101), memAndDisk, 0L, 400L),
BlockUpdatedInfo(bm1, RDDBlockId(1, 20), memAndDisk, 0L, 240L)
)
postUpdateBlocks(bus, blockUpdateInfos)
assert(storageListener._rddInfoMap(0).memSize === 400L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
......@@ -144,15 +138,14 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
assert(!storageListener._rddInfoMap(2).isCached)
assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
// Task end with a few dropped blocks
val metrics2 = new TaskMetrics
metrics2.setUpdatedBlockStatuses(Seq(
(RDDBlockId(0, 100), BlockStatus(none, 0L, 0L)),
(RDDBlockId(1, 20), BlockStatus(none, 0L, 0L)),
(RDDBlockId(2, 40), BlockStatus(none, 0L, 0L)), // doesn't actually exist
(RDDBlockId(4, 80), BlockStatus(none, 0L, 0L)) // doesn't actually exist
))
bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2))
// Drop some blocks
val blockUpdateInfos2 = Seq(
BlockUpdatedInfo(bm1, RDDBlockId(0, 100), none, 0L, 0L),
BlockUpdatedInfo(bm1, RDDBlockId(1, 20), none, 0L, 0L),
BlockUpdatedInfo(bm1, RDDBlockId(2, 40), none, 0L, 0L), // doesn't actually exist
BlockUpdatedInfo(bm1, RDDBlockId(4, 80), none, 0L, 0L) // doesn't actually exist
)
postUpdateBlocks(bus, blockUpdateInfos2)
assert(storageListener._rddInfoMap(0).memSize === 0L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
assert(storageListener._rddInfoMap(0).numCachedPartitions === 1)
......@@ -169,24 +162,27 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
val rddInfo1 = new RDDInfo(1, "rdd1", 1, memOnly, Seq(4))
val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), Seq.empty, "details")
val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details")
val taskMetrics0 = new TaskMetrics
val taskMetrics1 = new TaskMetrics
val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L))
val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L))
taskMetrics0.setUpdatedBlockStatuses(Seq(block0))
taskMetrics1.setUpdatedBlockStatuses(Seq(block1))
val blockUpdateInfos1 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(0, 1), memOnly, 100L, 0L))
val blockUpdateInfos2 = Seq(BlockUpdatedInfo(bm1, RDDBlockId(1, 1), memOnly, 200L, 0L))
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener.rddInfoList.size === 0)
bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0))
postUpdateBlocks(bus, blockUpdateInfos1)
assert(storageListener.rddInfoList.size === 1)
bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
assert(storageListener.rddInfoList.size === 1)
bus.postToAll(SparkListenerStageCompleted(stageInfo0))
assert(storageListener.rddInfoList.size === 1)
bus.postToAll(SparkListenerTaskEnd(1, 0, "small", Success, taskInfo1, taskMetrics1))
postUpdateBlocks(bus, blockUpdateInfos2)
assert(storageListener.rddInfoList.size === 2)
bus.postToAll(SparkListenerStageCompleted(stageInfo1))
assert(storageListener.rddInfoList.size === 2)
}
private def postUpdateBlocks(
bus: SparkListenerBus, blockUpdateInfos: Seq[BlockUpdatedInfo]): Unit = {
blockUpdateInfos.foreach { blockUpdateInfo =>
bus.postToAll(SparkListenerBlockUpdated(blockUpdateInfo))
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment