diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index 2c3da0ee85e067050a0e66f9d0eaf66f57e28e06..d4a59c33b974c4fe2c331777e9e20fec7d21b2f1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.storage
 
 import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
-import java.util.concurrent.ConcurrentHashMap
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
 
 import org.apache.spark.SparkContext
 import org.apache.spark.annotation.DeveloperApi
@@ -132,10 +133,17 @@ private[spark] object BlockManagerId {
     getCachedBlockManagerId(obj)
   }
 
-  val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
+  /**
+   * The max cache size is hardcoded to 10000, since the size of a BlockManagerId
+   * object is about 48B, the total memory cost should be below 1MB which is feasible.
+   */
+  val blockManagerIdCache = CacheBuilder.newBuilder()
+    .maximumSize(10000)
+    .build(new CacheLoader[BlockManagerId, BlockManagerId]() {
+      override def load(id: BlockManagerId) = id
+    })
 
   def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
-    blockManagerIdCache.putIfAbsent(id, id)
     blockManagerIdCache.get(id)
   }
 }