Skip to content
Snippets Groups Projects
Commit 3a1bcd40 authored by Richard Benkovsky's avatar Richard Benkovsky
Browse files

Added tests for CacheTrackerActor

parent 8f2f736d
No related branches found
No related tags found
No related merge requests found
......@@ -82,7 +82,8 @@ class CacheTrackerActor extends DaemonActor with Logging {
logInfo("Cache entry removed: (%s, %s) on %s".format(rddId, partition, host))
}
locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host)
reply('OK)
case MemoryCacheLost(host) =>
logInfo("Memory cache lost on " + host)
// TODO: Drop host from the memory locations list of all RDDs
......@@ -225,6 +226,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
// Called by the Cache to report that an entry has been dropped from it
def dropEntry(datasetId: Any, partition: Int) {
datasetId match {
//TODO - do we really want to use '!!' when nobody checks returned future? '!' seems to enough here.
case (cache.keySpaceId, rddId: Int) => trackerActor !! DroppedFromCache(rddId, partition, Utils.getHost)
}
}
......
package spark
import org.scalatest.FunSuite
import collection.mutable.HashMap
class CacheTrackerSuite extends FunSuite {
test("CacheTrackerActor slave initialization & cache status") {
System.setProperty("spark.master.port", "1345")
val initialSize = 2L << 20
val tracker = new CacheTrackerActor
tracker.start()
tracker !? SlaveCacheStarted("host001", initialSize)
assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 0L)))
tracker !? StopCacheTracker
}
test("RegisterRDD") {
System.setProperty("spark.master.port", "1345")
val initialSize = 2L << 20
val tracker = new CacheTrackerActor
tracker.start()
tracker !? SlaveCacheStarted("host001", initialSize)
tracker !? RegisterRDD(1, 3)
tracker !? RegisterRDD(2, 1)
assert(getCacheLocations(tracker) == Map(1 -> List(List(), List(), List()), 2 -> List(List())))
tracker !? StopCacheTracker
}
test("AddedToCache") {
System.setProperty("spark.master.port", "1345")
val initialSize = 2L << 20
val tracker = new CacheTrackerActor
tracker.start()
tracker !? SlaveCacheStarted("host001", initialSize)
tracker !? RegisterRDD(1, 2)
tracker !? RegisterRDD(2, 1)
tracker !? AddedToCache(1, 0, "host001", 2L << 15)
tracker !? AddedToCache(1, 1, "host001", 2L << 11)
tracker !? AddedToCache(2, 0, "host001", 3L << 10)
assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 72704L)))
assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"), List("host001")), 2 -> List(List("host001"))))
tracker !? StopCacheTracker
}
test("DroppedFromCache") {
System.setProperty("spark.master.port", "1345")
val initialSize = 2L << 20
val tracker = new CacheTrackerActor
tracker.start()
tracker !? SlaveCacheStarted("host001", initialSize)
tracker !? RegisterRDD(1, 2)
tracker !? RegisterRDD(2, 1)
tracker !? AddedToCache(1, 0, "host001", 2L << 15)
tracker !? AddedToCache(1, 1, "host001", 2L << 11)
tracker !? AddedToCache(2, 0, "host001", 3L << 10)
assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 72704L)))
assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"), List("host001")), 2 -> List(List("host001"))))
tracker !? DroppedFromCache(1, 1, "host001", 2L << 11)
assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 68608L)))
assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"),List()), 2 -> List(List("host001"))))
tracker !? StopCacheTracker
}
/**
* Helper function to get cacheLocations from CacheTracker
*/
def getCacheLocations(tracker: CacheTrackerActor) = tracker !? GetCacheLocations match {
case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]].map {
case (i, arr) => (i -> arr.toList)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment