diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala index c49be803e432b0f8d1033565d71106c98cf9db25..c25d0a62dfe126dca3bf8f6f85533fc0060245dc 100644 --- a/core/src/main/scala/spark/BoundedMemoryCache.scala +++ b/core/src/main/scala/spark/BoundedMemoryCache.scala @@ -30,7 +30,7 @@ class BoundedMemoryCache extends Cache with Logging { } } - override def put(datasetId: Any, partition: Int, value: Any): Boolean = { + override def put(datasetId: Any, partition: Int, value: Any): Long = { val key = (datasetId, partition) logInfo("Asked to add key " + key) val startTime = System.currentTimeMillis @@ -44,14 +44,16 @@ class BoundedMemoryCache extends Cache with Logging { map.put(key, new Entry(value, size)) currentBytes += size logInfo("Number of entries is now " + map.size) - return true + return size } else { logInfo("Didn't add key " + key + " because we would have evicted part of same dataset") - return false + return -1L } } } + override def getCapacity: Long = maxBytes + private def getMaxBytes(): Long = { val memoryFractionToUse = System.getProperty( "spark.boundedMemoryCache.memoryFraction", "0.66").toDouble diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala index 263761bb951c62e29ba071062b9ea2708c74f5f1..a65d3b478de5b78319f4aeda8b73b7065b362dd5 100644 --- a/core/src/main/scala/spark/Cache.scala +++ b/core/src/main/scala/spark/Cache.scala @@ -24,12 +24,25 @@ abstract class Cache { def newKeySpace() = new KeySpace(this, newKeySpaceId()) - // Get the value for a given (datasetId, partition), or null if it is not found. + /** + * Get the value for a given (datasetId, partition), or null if it is not + * found. + */ def get(datasetId: Any, partition: Int): Any - // Attempt to put a value in the cache; returns false if this was not successful (e.g. because - // the cache replacement policy forbids it). - def put(datasetId: Any, partition: Int, value: Any): Boolean + /** + * Attempt to put a value in the cache; returns a negative number if this was + * not successful (e.g. because the cache replacement policy forbids it). If + * size estimation is available, the cache implementation should return the + * estimated size of the partition if the partition is successfully cached. + */ + def put(datasetId: Any, partition: Int, value: Any): Long + + /** + * Report the capacity of the cache partition. By default this just reports + * zero. Specific implementations can choose to provide the capacity number. + */ + def getCapacity: Long = 0L } /** @@ -39,6 +52,8 @@ class KeySpace(cache: Cache, val keySpaceId: Int) { def get(datasetId: Any, partition: Int): Any = cache.get((keySpaceId, datasetId), partition) - def put(datasetId: Any, partition: Int, value: Any): Boolean = + def put(datasetId: Any, partition: Int, value: Any): Long = cache.put((keySpaceId, datasetId), partition, value) + + def getCapacity: Long = cache.getCapacity } diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index c399748af36e42c74ffa8d0e2b1d1bb8d7aacd6d..b472dc8070c6964fb08d2af69b4a85bed6f5deda 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -7,15 +7,27 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet sealed trait CacheTrackerMessage -case class AddedToCache(rddId: Int, partition: Int, host: String) extends CacheTrackerMessage -case class DroppedFromCache(rddId: Int, partition: Int, host: String) extends CacheTrackerMessage +case class AddedToCache(rddId: Int, partition: Int, host: String, size: Long = 0L) + extends CacheTrackerMessage +case class DroppedFromCache(rddId: Int, partition: Int, host: String, size: Long = 0L) + extends CacheTrackerMessage case class MemoryCacheLost(host: String) extends CacheTrackerMessage case class RegisterRDD(rddId: Int, numPartitions: Int) extends CacheTrackerMessage +case class SlaveCacheStarted(host: String, size: Long) extends CacheTrackerMessage +case object GetCacheStatus extends CacheTrackerMessage case object GetCacheLocations extends CacheTrackerMessage case object StopCacheTracker extends CacheTrackerMessage + class CacheTrackerActor extends DaemonActor with Logging { val locs = new HashMap[Int, Array[List[String]]] + + /** + * A map from the slave's host name to its cache size. + */ + val slaveCapacity = new HashMap[String, Long] + val slaveUsage = new HashMap[String, Long] + // TODO: Should probably store (String, CacheType) tuples def act() { @@ -26,18 +38,42 @@ class CacheTrackerActor extends DaemonActor with Logging { loop { react { + case SlaveCacheStarted(host: String, size: Long) => + logInfo("Started slave cache (size %s) on %s".format(Utils.sizeWithSuffix(size), host)) + slaveCapacity.put(host, size) + slaveUsage.put(host, 0) + reply('OK) + case RegisterRDD(rddId: Int, numPartitions: Int) => logInfo("Registering RDD " + rddId + " with " + numPartitions + " partitions") locs(rddId) = Array.fill[List[String]](numPartitions)(Nil) reply('OK) - case AddedToCache(rddId, partition, host) => - logInfo("Cache entry added: (%s, %s) on %s".format(rddId, partition, host)) + case AddedToCache(rddId, partition, host, size) => + if (size > 0) { + logInfo("Cache entry added: (%s, %s) on %s, size: %s".format( + rddId, partition, host, Utils.sizeWithSuffix(size))) + slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) + size) + } else { + logInfo("Cache entry added: (%s, %s) on %s".format(rddId, partition, host)) + } locs(rddId)(partition) = host :: locs(rddId)(partition) reply('OK) - case DroppedFromCache(rddId, partition, host) => - logInfo("Cache entry removed: (%s, %s) on %s".format(rddId, partition, host)) + case DroppedFromCache(rddId, partition, host, size) => + if (size > 0) { + logInfo("Cache entry removed: (%s, %s) on %s, size: %s".format( + rddId, partition, host, Utils.sizeWithSuffix(size))) + slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) - size) + + // Do a sanity check to make sure usage is greater than 0. + val usage = slaveUsage.getOrElse(host, 0L) + if (usage < 0) { + logError("Cache usage on %s is negative (%d)".format(host, usage)) + } + } else { + logInfo("Cache entry removed: (%s, %s) on %s".format(rddId, partition, host)) + } locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host) case MemoryCacheLost(host) => @@ -52,6 +88,12 @@ class CacheTrackerActor extends DaemonActor with Logging { } reply(locsCopy) + case GetCacheStatus => + val status: Seq[Tuple3[String, Long, Long]] = slaveCapacity.keys.map { key => + (key, slaveCapacity.getOrElse(key, 0L), slaveUsage.getOrElse(key, 0L)) + }.toSeq + reply(status) + case StopCacheTracker => reply('OK) exit() @@ -60,10 +102,16 @@ class CacheTrackerActor extends DaemonActor with Logging { } } + class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { // Tracker actor on the master, or remote reference to it on workers var trackerActor: AbstractActor = null - + + val registeredRddIds = new HashSet[Int] + + // Stores map results for various splits locally + val cache = theCache.newKeySpace() + if (isMaster) { val tracker = new CacheTrackerActor tracker.start() @@ -74,10 +122,10 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { trackerActor = RemoteActor.select(Node(host, port), 'CacheTracker) } - val registeredRddIds = new HashSet[Int] - - // Stores map results for various splits locally - val cache = theCache.newKeySpace() + // Report the cache being started. + trackerActor !? SlaveCacheStarted( + System.getProperty("spark.hostname", Utils.localHostName), + cache.getCapacity) // Remembers which splits are currently being loaded (on worker nodes) val loading = new HashSet[(Int, Int)] @@ -92,17 +140,30 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { } } } - + // Get a snapshot of the currently known locations def getLocationsSnapshot(): HashMap[Int, Array[List[String]]] = { (trackerActor !? GetCacheLocations) match { case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]] - - case _ => + + case _ => throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap") } } + + // Get the usage status of slave caches. Each tuple in the returned sequence + // is in the form of (host name, capacity, usage). + def getCacheStatus(): Seq[Tuple3[String, Long, Long]] = { + (trackerActor !? GetCacheStatus) match { + case h: Seq[Tuple3[String, Long, Long]] => + h.asInstanceOf[Seq[Tuple3[String, Long, Long]]] + + case _ => + throw new SparkException( + "Internal error: CacheTrackerActor did not reply with a Seq[Tuple3[String, Long, Long]") + } + } // Gets or computes an RDD split def getOrCompute[T](rdd: RDD[T], split: Split)(implicit m: ClassManifest[T]): Iterator[T] = { @@ -138,10 +199,10 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { // TODO: fetch any remote copy of the split that may be available logInfo("Computing partition " + split) var array: Array[T] = null - var putSuccessful: Boolean = false + var putRetval: Long = -1L try { array = rdd.compute(split).toArray(m) - putSuccessful = cache.put(rdd.id, split.index, array) + putRetval = cache.put(rdd.id, split.index, array) } finally { // Tell other threads that we've finished our attempt to load the key (whether or not // we've actually succeeded to put it in the map) @@ -150,10 +211,10 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { loading.notifyAll() } } - if (putSuccessful) { + if (putRetval >= 0) { // Tell the master that we added the entry. Don't return until it replies so it can // properly schedule future tasks that use this RDD. - trackerActor !? AddedToCache(rdd.id, split.index, host) + trackerActor !? AddedToCache(rdd.id, split.index, host, putRetval) } return array.iterator } diff --git a/core/src/main/scala/spark/DiskSpillingCache.scala b/core/src/main/scala/spark/DiskSpillingCache.scala index e4d0f991aa3a4cf122a6f9900e23758450e9ad26..037ed786887c6d22aecd090c21fd8ef56a384cfb 100644 --- a/core/src/main/scala/spark/DiskSpillingCache.scala +++ b/core/src/main/scala/spark/DiskSpillingCache.scala @@ -44,7 +44,7 @@ class DiskSpillingCache extends BoundedMemoryCache { } } - override def put(datasetId: Any, partition: Int, value: Any): Boolean = { + override def put(datasetId: Any, partition: Int, value: Any): Long = { var ser = SparkEnv.get.serializer.newInstance() super.put(datasetId, partition, ser.serialize(value)) } diff --git a/core/src/main/scala/spark/SerializingCache.scala b/core/src/main/scala/spark/SerializingCache.scala index f6964905c76f8226fa50b22760d56f076c16790a..17dc735d5e02447f3781124271139d72d79a256b 100644 --- a/core/src/main/scala/spark/SerializingCache.scala +++ b/core/src/main/scala/spark/SerializingCache.scala @@ -9,7 +9,7 @@ import java.io._ class SerializingCache extends Cache with Logging { val bmc = new BoundedMemoryCache - override def put(datasetId: Any, partition: Int, value: Any): Boolean = { + override def put(datasetId: Any, partition: Int, value: Any): Long = { val ser = SparkEnv.get.serializer.newInstance() bmc.put(datasetId, partition, ser.serialize(value)) } diff --git a/core/src/main/scala/spark/SoftReferenceCache.scala b/core/src/main/scala/spark/SoftReferenceCache.scala index c507df928b95b8c32b96155e0ab3bafbdc81cb66..cd2386eb838d64739ebfb23fbca8e2df631c280a 100644 --- a/core/src/main/scala/spark/SoftReferenceCache.scala +++ b/core/src/main/scala/spark/SoftReferenceCache.scala @@ -11,8 +11,8 @@ class SoftReferenceCache extends Cache { override def get(datasetId: Any, partition: Int): Any = map.get((datasetId, partition)) - override def put(datasetId: Any, partition: Int, value: Any): Boolean = { + override def put(datasetId: Any, partition: Int, value: Any): Long = { map.put((datasetId, partition), value) - return true + return 0 } } diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 58b5fa6bbd77fbad5a814989468a935f1320b616..5aecbdde7dea2ab5d85e21ff8a8006735a4cecda 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -174,4 +174,28 @@ object Utils { throw new IOException("Failed to delete: " + file) } } + + /** + * Use unit suffixes (Byte, Kilobyte, Megabyte, Gigabyte, Terabyte and + * Petabyte) in order to reduce the number of digits to four or less. For + * example, 4,000,000 is returned as 4MB. + */ + def sizeWithSuffix(size: Long): String = { + val GB = 1L << 30 + val MB = 1L << 20 + val KB = 1L << 10 + val B = 1L + val (value, unit) = { + if (size >= 2*GB) { + (size.asInstanceOf[Double] / GB, "GB") + } else if (size >= 2*MB) { + (size.asInstanceOf[Double] / MB, "MB") + } else if (size >= 2*KB) { + (size.asInstanceOf[Double] / KB, "KB") + } else { + (size.asInstanceOf[Double], "B") + } + } + "%.1f%s".format(value, unit) + } } diff --git a/core/src/test/scala/spark/Utils.scala b/core/src/test/scala/spark/Utils.scala new file mode 100644 index 0000000000000000000000000000000000000000..b78b638bb1688be317c130199f7e0c3180111e1a --- /dev/null +++ b/core/src/test/scala/spark/Utils.scala @@ -0,0 +1,18 @@ +package spark + +import org.scalatest.FunSuite + + +class UtilsSuite extends FunSuite { + + test("sizeWithSuffix") { + assert(Utils.sizeWithSuffix(10) === "10.0B") + assert(Utils.sizeWithSuffix(1500) === "1500.0B") + assert(Utils.sizeWithSuffix(2000000) === "1953.1KB") + assert(Utils.sizeWithSuffix(2097152) === "2.0MB") + assert(Utils.sizeWithSuffix(2306867) === "2.2MB") + assert(Utils.sizeWithSuffix(5368709120L) === "5.0GB") + } + +} +