Skip to content
Snippets Groups Projects
Commit 6a8abe29 authored by zhoukang's avatar zhoukang Committed by Wenchen Fan
Browse files

[SPARK-23508][CORE] Fix BlockmanagerId in case blockManagerIdCache cause oom

… cause oom

## What changes were proposed in this pull request?
blockManagerIdCache in BlockManagerId will not remove old values which may cause oom

`val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()`
Since whenever we apply a new BlockManagerId, it will put into this map.

This patch will use guava cahce for  blockManagerIdCache instead.

A heap dump show in [SPARK-23508](https://issues.apache.org/jira/browse/SPARK-23508)

## How was this patch tested?
Exist tests.

Author: zhoukang <zhoukang199191@gmail.com>

Closes #20667 from caneGuy/zhoukang/fix-history.
parent b14993e1
No related branches found
No related tags found
No related merge requests found
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
package org.apache.spark.storage package org.apache.spark.storage
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap
import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.apache.spark.SparkContext import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi import org.apache.spark.annotation.DeveloperApi
...@@ -132,10 +133,17 @@ private[spark] object BlockManagerId { ...@@ -132,10 +133,17 @@ private[spark] object BlockManagerId {
getCachedBlockManagerId(obj) getCachedBlockManagerId(obj)
} }
val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() /**
* The max cache size is hardcoded to 10000, since the size of a BlockManagerId
* object is about 48B, the total memory cost should be below 1MB which is feasible.
*/
val blockManagerIdCache = CacheBuilder.newBuilder()
.maximumSize(10000)
.build(new CacheLoader[BlockManagerId, BlockManagerId]() {
override def load(id: BlockManagerId) = id
})
def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
blockManagerIdCache.putIfAbsent(id, id)
blockManagerIdCache.get(id) blockManagerIdCache.get(id)
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment