From 7be1c7b3318a4927e67b1edd5e7438845a530046 Mon Sep 17 00:00:00 2001 From: Ankur Dave <ankurdave@gmail.com> Date: Fri, 6 Apr 2012 15:48:36 -0700 Subject: [PATCH] Report entry dropping in BoundedMemoryCache --- core/src/main/scala/spark/BoundedMemoryCache.scala | 1 + core/src/main/scala/spark/CacheTracker.scala | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala index ccf55d66af..e8e50ac360 100644 --- a/core/src/main/scala/spark/BoundedMemoryCache.scala +++ b/core/src/main/scala/spark/BoundedMemoryCache.scala @@ -70,5 +70,6 @@ class BoundedMemoryCache extends Cache with Logging { protected def dropEntry(key: Any, entry: Entry) { logInfo("Dropping key %s of size %d to make space".format(key, entry.size)) + SparkEnv.get.cacheTracker.dropEntry(key) } } diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 5e9a70cc7e..5b6eed743f 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -143,6 +143,17 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { } } + // Reports that an entry has been dropped from the cache + def dropEntry(key: Any) { + key match { + case (keySpaceId: Long, (rddId: Int, partition: Int)) => + val host = System.getProperty("spark.hostname", Utils.localHostName) + trackerActor !! DroppedFromCache(rddId, partition, host) + case _ => + logWarning("Unknown key format: %s".format(key)) + } + } + def stop() { trackerActor !? StopCacheTracker registeredRddIds.clear() -- GitLab