diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala index ccf55d66af1b8fe4236812e6581f7235d4f3979b..e8e50ac36020def80ec6b5c9f52cb38f6710c9fc 100644 --- a/core/src/main/scala/spark/BoundedMemoryCache.scala +++ b/core/src/main/scala/spark/BoundedMemoryCache.scala @@ -70,5 +70,6 @@ class BoundedMemoryCache extends Cache with Logging { protected def dropEntry(key: Any, entry: Entry) { logInfo("Dropping key %s of size %d to make space".format(key, entry.size)) + SparkEnv.get.cacheTracker.dropEntry(key) } } diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 5e9a70cc7e03813cd31fd4d24043c77add1aceb7..5b6eed743f53a2a236a5f2ed18cc49c62994f60b 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -143,6 +143,17 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { } } + // Reports that an entry has been dropped from the cache + def dropEntry(key: Any) { + key match { + case (keySpaceId: Long, (rddId: Int, partition: Int)) => + val host = System.getProperty("spark.hostname", Utils.localHostName) + trackerActor !! DroppedFromCache(rddId, partition, host) + case _ => + logWarning("Unknown key format: %s".format(key)) + } + } + def stop() { trackerActor !? StopCacheTracker registeredRddIds.clear()