diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala index f778d8cc17d0c69b8a17de1e1c38d9b56a82a030..1162e34ab03340c763e943b696a611ba9cb5d8d8 100644 --- a/core/src/main/scala/spark/BoundedMemoryCache.scala +++ b/core/src/main/scala/spark/BoundedMemoryCache.scala @@ -9,16 +9,16 @@ import java.util.LinkedHashMap * some cache entries have pointers to a shared object. Nonetheless, this Cache should work well * when most of the space is used by arrays of primitives or of simple classes. */ -class BoundedMemoryCache extends Cache with Logging { - private val maxBytes: Long = getMaxBytes() +class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging { logInfo("BoundedMemoryCache.maxBytes = " + maxBytes) + def this() { + this(BoundedMemoryCache.getMaxBytes) + } + private var currentBytes = 0L private val map = new LinkedHashMap[(Any, Int), Entry](32, 0.75f, true) - // An entry in our map; stores a cached object and its size in bytes - class Entry(val value: Any, val size: Long) {} - override def get(datasetId: Any, partition: Int): Any = { synchronized { val entry = map.get((datasetId, partition)) @@ -33,13 +33,11 @@ class BoundedMemoryCache extends Cache with Logging { override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = { val key = (datasetId, partition) logInfo("Asked to add key " + key) - val startTime = System.currentTimeMillis - val size = SizeEstimator.estimate(value.asInstanceOf[AnyRef]) - val timeTaken = System.currentTimeMillis - startTime - logInfo("Estimated size for key %s is %d".format(key, size)) - logInfo("Size estimation for key %s took %d ms".format(key, timeTaken)) + val size = estimateValueSize(key, value) synchronized { - if (ensureFreeSpace(datasetId, size)) { + if (size > getCapacity) { + return CachePutFailure() + } else if (ensureFreeSpace(datasetId, size)) { logInfo("Adding key " + key) map.put(key, new Entry(value, size)) currentBytes += size @@ -54,10 +52,16 @@ class BoundedMemoryCache extends Cache with Logging { override def getCapacity: Long = maxBytes - private def getMaxBytes(): Long = { - val memoryFractionToUse = System.getProperty( - "spark.boundedMemoryCache.memoryFraction", "0.66").toDouble - (Runtime.getRuntime.maxMemory * memoryFractionToUse).toLong + /** + * Estimate sizeOf 'value' + */ + private def estimateValueSize(key: (Any, Int), value: Any) = { + val startTime = System.currentTimeMillis + val size = SizeEstimator.estimate(value.asInstanceOf[AnyRef]) + val timeTaken = System.currentTimeMillis - startTime + logInfo("Estimated size for key %s is %d".format(key, size)) + logInfo("Size estimation for key %s took %d ms".format(key, timeTaken)) + size } /** @@ -85,8 +89,21 @@ class BoundedMemoryCache extends Cache with Logging { } protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) { - logInfo("Dropping key (%s, %d) of size %d to make space".format( - datasetId, partition, entry.size)) + logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size)) SparkEnv.get.cacheTracker.dropEntry(datasetId, partition) } } + +// An entry in our map; stores a cached object and its size in bytes +case class Entry(value: Any, size: Long) + +object BoundedMemoryCache { + /** + * Get maximum cache capacity from system configuration + */ + def getMaxBytes: Long = { + val memoryFractionToUse = System.getProperty("spark.boundedMemoryCache.memoryFraction", "0.66").toDouble + (Runtime.getRuntime.maxMemory * memoryFractionToUse).toLong + } +} + diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala index aeff205884df96ba0710a1f22ac7840d44dc77e1..150fe14e2c8d502630b551a93157337654c32342 100644 --- a/core/src/main/scala/spark/Cache.scala +++ b/core/src/main/scala/spark/Cache.scala @@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicInteger sealed trait CachePutResponse case class CachePutSuccess(size: Long) extends CachePutResponse -case class CachePutFailure extends CachePutResponse +case class CachePutFailure() extends CachePutResponse /** * An interface for caches in Spark, to allow for multiple implementations. Caches are used to store diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 5b5831b2defdfd0ca038c4480eaa301cf9d72cea..4867829c17ac6519dba55aed2a47af78f54fe85f 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -56,7 +56,7 @@ class CacheTrackerActor extends DaemonActor with Logging { case AddedToCache(rddId, partition, host, size) => if (size > 0) { - slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) + size) + slaveUsage.put(host, getCacheUsage(host) + size) logInfo("Cache entry added: (%s, %s) on %s (size added: %s, available: %s)".format( rddId, partition, host, Utils.memoryBytesToString(size), Utils.memoryBytesToString(getCacheAvailable(host)))) @@ -71,10 +71,10 @@ class CacheTrackerActor extends DaemonActor with Logging { logInfo("Cache entry removed: (%s, %s) on %s (size dropped: %s, available: %s)".format( rddId, partition, host, Utils.memoryBytesToString(size), Utils.memoryBytesToString(getCacheAvailable(host)))) - slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) - size) + slaveUsage.put(host, getCacheUsage(host) - size) // Do a sanity check to make sure usage is greater than 0. - val usage = slaveUsage.getOrElse(host, 0L) + val usage = getCacheUsage(host) if (usage < 0) { logError("Cache usage on %s is negative (%d)".format(host, usage)) } @@ -82,22 +82,19 @@ class CacheTrackerActor extends DaemonActor with Logging { logInfo("Cache entry removed: (%s, %s) on %s".format(rddId, partition, host)) } locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host) - + reply('OK) + case MemoryCacheLost(host) => logInfo("Memory cache lost on " + host) // TODO: Drop host from the memory locations list of all RDDs case GetCacheLocations => logInfo("Asked for current cache locations") - val locsCopy = new HashMap[Int, Array[List[String]]] - for ((rddId, array) <- locs) { - locsCopy(rddId) = array.clone() - } - reply(locsCopy) + reply(locs.map{case (rrdId, array) => (rrdId -> array.clone())}) case GetCacheStatus => - val status: Seq[Tuple3[String, Long, Long]] = slaveCapacity.keys.map { key => - (key, slaveCapacity.getOrElse(key, 0L), slaveUsage.getOrElse(key, 0L)) + val status = slaveCapacity.map { case (host,capacity) => + (host, capacity, getCacheUsage(host)) }.toSeq reply(status) @@ -130,9 +127,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { } // Report the cache being started. - trackerActor !? SlaveCacheStarted( - System.getProperty("spark.hostname", Utils.localHostName), - cache.getCapacity) + trackerActor !? SlaveCacheStarted(Utils.getHost, cache.getCapacity) // Remembers which splits are currently being loaded (on worker nodes) val loading = new HashSet[(Int, Int)] @@ -151,20 +146,17 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { // Get a snapshot of the currently known locations def getLocationsSnapshot(): HashMap[Int, Array[List[String]]] = { (trackerActor !? GetCacheLocations) match { - case h: HashMap[_, _] => - h.asInstanceOf[HashMap[Int, Array[List[String]]]] + case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]] - case _ => - throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap") + case _ => throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap") } } // Get the usage status of slave caches. Each tuple in the returned sequence // is in the form of (host name, capacity, usage). - def getCacheStatus(): Seq[Tuple3[String, Long, Long]] = { + def getCacheStatus(): Seq[(String, Long, Long)] = { (trackerActor !? GetCacheStatus) match { - case h: Seq[Tuple3[String, Long, Long]] => - h.asInstanceOf[Seq[Tuple3[String, Long, Long]]] + case h: Seq[(String, Long, Long)] => h.asInstanceOf[Seq[(String, Long, Long)]] case _ => throw new SparkException( @@ -202,7 +194,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { } // If we got here, we have to load the split // Tell the master that we're doing so - val host = System.getProperty("spark.hostname", Utils.localHostName) + // TODO: fetch any remote copy of the split that may be available logInfo("Computing partition " + split) var array: Array[T] = null @@ -223,7 +215,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { case CachePutSuccess(size) => { // Tell the master that we added the entry. Don't return until it // replies so it can properly schedule future tasks that use this RDD. - trackerActor !? AddedToCache(rdd.id, split.index, host, size) + trackerActor !? AddedToCache(rdd.id, split.index, Utils.getHost, size) } case _ => null } @@ -234,9 +226,8 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { // Called by the Cache to report that an entry has been dropped from it def dropEntry(datasetId: Any, partition: Int) { datasetId match { - case (cache.keySpaceId, rddId: Int) => - val host = System.getProperty("spark.hostname", Utils.localHostName) - trackerActor !! DroppedFromCache(rddId, partition, host) + //TODO - do we really want to use '!!' when nobody checks returned future? '!' seems to enough here. + case (cache.keySpaceId, rddId: Int) => trackerActor !! DroppedFromCache(rddId, partition, Utils.getHost) } } diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index b95f40b877c8fc07d1ea7d6debab9bfa88d0e37a..755e001106e4a3120f0152b0dec9f6fad18d82e5 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -42,7 +42,7 @@ private class MesosScheduler( // Memory used by each executor (in megabytes) val EXECUTOR_MEMORY = { if (System.getenv("SPARK_MEM") != null) { - memoryStringToMb(System.getenv("SPARK_MEM")) + MesosScheduler.memoryStringToMb(System.getenv("SPARK_MEM")) // TODO: Might need to add some extra memory for the non-heap parts of the JVM } else { 512 @@ -78,9 +78,7 @@ private class MesosScheduler( // Sorts jobs in reverse order of run ID for use in our priority queue (so lower IDs run first) private val jobOrdering = new Ordering[Job] { - override def compare(j1: Job, j2: Job): Int = { - return j2.runId - j1.runId - } + override def compare(j1: Job, j2: Job): Int = j2.runId - j1.runId } def newJobId(): Int = this.synchronized { @@ -156,7 +154,7 @@ private class MesosScheduler( activeJobs(jobId) = myJob activeJobsQueue += myJob logInfo("Adding job with ID " + jobId) - jobTasks(jobId) = new HashSet() + jobTasks(jobId) = HashSet.empty[String] } driver.reviveOffers(); } @@ -376,24 +374,27 @@ private class MesosScheduler( } override def offerRescinded(d: SchedulerDriver, o: OfferID) {} +} +object MesosScheduler { /** - * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. - * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM + * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. + * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM * environment variable. */ def memoryStringToMb(str: String): Int = { val lower = str.toLowerCase if (lower.endsWith("k")) { - (lower.substring(0, lower.length-1).toLong / 1024).toInt + (lower.substring(0, lower.length - 1).toLong / 1024).toInt } else if (lower.endsWith("m")) { - lower.substring(0, lower.length-1).toInt + lower.substring(0, lower.length - 1).toInt } else if (lower.endsWith("g")) { - lower.substring(0, lower.length-1).toInt * 1024 + lower.substring(0, lower.length - 1).toInt * 1024 } else if (lower.endsWith("t")) { - lower.substring(0, lower.length-1).toInt * 1024 * 1024 - } else {// no suffix, so it's just a number in bytes + lower.substring(0, lower.length - 1).toInt * 1024 * 1024 + } else { + // no suffix, so it's just a number in bytes (lower.toLong / 1024 / 1024).toInt } } -} +} \ No newline at end of file diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index d108c14f6be30a3668b5e68ca6c2344d2a653281..cfd6dc8b2aa3550e0f47dfdfbcc85732a72cd050 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -2,11 +2,11 @@ package spark import java.io._ import java.net.InetAddress -import java.util.UUID import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor} import scala.collection.mutable.ArrayBuffer import scala.util.Random +import java.util.{Locale, UUID} /** * Various utility methods used by Spark. @@ -157,9 +157,12 @@ object Utils { /** * Get the local machine's hostname. */ - def localHostName(): String = { - return InetAddress.getLocalHost().getHostName - } + def localHostName(): String = InetAddress.getLocalHost.getHostName + + /** + * Get current host + */ + def getHost = System.getProperty("spark.hostname", localHostName()) /** * Delete a file or directory and its contents recursively. @@ -184,7 +187,7 @@ object Utils { val GB = 1L << 30 val MB = 1L << 20 val KB = 1L << 10 - val B = 1L + val (value, unit) = { if (size >= 2*GB) { (size.asInstanceOf[Double] / GB, "GB") @@ -196,6 +199,6 @@ object Utils { (size.asInstanceOf[Double], "B") } } - "%.1f%s".format(value, unit) + "%.1f%s".formatLocal(Locale.US, value, unit) } } diff --git a/core/src/test/scala/spark/BoundedMemoryCacheTest.scala b/core/src/test/scala/spark/BoundedMemoryCacheTest.scala new file mode 100644 index 0000000000000000000000000000000000000000..344a733ab3f9a329887d538f064c7a45325331de --- /dev/null +++ b/core/src/test/scala/spark/BoundedMemoryCacheTest.scala @@ -0,0 +1,31 @@ +package spark + +import org.scalatest.FunSuite + +class BoundedMemoryCacheTest extends FunSuite { + test("constructor test") { + val cache = new BoundedMemoryCache(40) + expect(40)(cache.getCapacity) + } + + test("caching") { + val cache = new BoundedMemoryCache(40) { + //TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry' + override protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) { + logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size)) + } + } + //should be OK + expect(CachePutSuccess(30))(cache.put("1", 0, "Meh")) + + //we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from + //cache because it's from the same dataset + expect(CachePutFailure())(cache.put("1", 1, "Meh")) + + //should be OK, dataset '1' can be evicted from cache + expect(CachePutSuccess(30))(cache.put("2", 0, "Meh")) + + //should fail, cache should obey it's capacity + expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string")) + } +} diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..60290d14cab69427a771004f5e1270f00708eaa4 --- /dev/null +++ b/core/src/test/scala/spark/CacheTrackerSuite.scala @@ -0,0 +1,97 @@ +package spark + +import org.scalatest.FunSuite +import collection.mutable.HashMap + +class CacheTrackerSuite extends FunSuite { + + test("CacheTrackerActor slave initialization & cache status") { + System.setProperty("spark.master.port", "1345") + val initialSize = 2L << 20 + + val tracker = new CacheTrackerActor + tracker.start() + + tracker !? SlaveCacheStarted("host001", initialSize) + + assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 0L))) + + tracker !? StopCacheTracker + } + + test("RegisterRDD") { + System.setProperty("spark.master.port", "1345") + val initialSize = 2L << 20 + + val tracker = new CacheTrackerActor + tracker.start() + + tracker !? SlaveCacheStarted("host001", initialSize) + + tracker !? RegisterRDD(1, 3) + tracker !? RegisterRDD(2, 1) + + assert(getCacheLocations(tracker) == Map(1 -> List(List(), List(), List()), 2 -> List(List()))) + + tracker !? StopCacheTracker + } + + test("AddedToCache") { + System.setProperty("spark.master.port", "1345") + val initialSize = 2L << 20 + + val tracker = new CacheTrackerActor + tracker.start() + + tracker !? SlaveCacheStarted("host001", initialSize) + + tracker !? RegisterRDD(1, 2) + tracker !? RegisterRDD(2, 1) + + tracker !? AddedToCache(1, 0, "host001", 2L << 15) + tracker !? AddedToCache(1, 1, "host001", 2L << 11) + tracker !? AddedToCache(2, 0, "host001", 3L << 10) + + assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 72704L))) + + assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"), List("host001")), 2 -> List(List("host001")))) + + tracker !? StopCacheTracker + } + + test("DroppedFromCache") { + System.setProperty("spark.master.port", "1345") + val initialSize = 2L << 20 + + val tracker = new CacheTrackerActor + tracker.start() + + tracker !? SlaveCacheStarted("host001", initialSize) + + tracker !? RegisterRDD(1, 2) + tracker !? RegisterRDD(2, 1) + + tracker !? AddedToCache(1, 0, "host001", 2L << 15) + tracker !? AddedToCache(1, 1, "host001", 2L << 11) + tracker !? AddedToCache(2, 0, "host001", 3L << 10) + + assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 72704L))) + assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"), List("host001")), 2 -> List(List("host001")))) + + tracker !? DroppedFromCache(1, 1, "host001", 2L << 11) + + assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 68608L))) + assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"),List()), 2 -> List(List("host001")))) + + tracker !? StopCacheTracker + } + + /** + * Helper function to get cacheLocations from CacheTracker + */ + def getCacheLocations(tracker: CacheTrackerActor) = tracker !? GetCacheLocations match { + case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]].map { + case (i, arr) => (i -> arr.toList) + } + } +} diff --git a/core/src/test/scala/spark/MesosSchedulerSuite.scala b/core/src/test/scala/spark/MesosSchedulerSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..0e6820cbdcf31b0135d57283ef6b2b78681a5569 --- /dev/null +++ b/core/src/test/scala/spark/MesosSchedulerSuite.scala @@ -0,0 +1,28 @@ +package spark + +import org.scalatest.FunSuite + +class MesosSchedulerSuite extends FunSuite { + test("memoryStringToMb"){ + + assert(MesosScheduler.memoryStringToMb("1") == 0) + assert(MesosScheduler.memoryStringToMb("1048575") == 0) + assert(MesosScheduler.memoryStringToMb("3145728") == 3) + + assert(MesosScheduler.memoryStringToMb("1024k") == 1) + assert(MesosScheduler.memoryStringToMb("5000k") == 4) + assert(MesosScheduler.memoryStringToMb("4024k") == MesosScheduler.memoryStringToMb("4024K")) + + assert(MesosScheduler.memoryStringToMb("1024m") == 1024) + assert(MesosScheduler.memoryStringToMb("5000m") == 5000) + assert(MesosScheduler.memoryStringToMb("4024m") == MesosScheduler.memoryStringToMb("4024M")) + + assert(MesosScheduler.memoryStringToMb("2g") == 2048) + assert(MesosScheduler.memoryStringToMb("3g") == MesosScheduler.memoryStringToMb("3G")) + + assert(MesosScheduler.memoryStringToMb("2t") == 2097152) + assert(MesosScheduler.memoryStringToMb("3t") == MesosScheduler.memoryStringToMb("3T")) + + + } +} diff --git a/core/src/test/scala/spark/Utils.scala b/core/src/test/scala/spark/UtilsSuite.scala similarity index 56% rename from core/src/test/scala/spark/Utils.scala rename to core/src/test/scala/spark/UtilsSuite.scala index 4e852903be663423e2686806e0a82dec2bf55ba9..f31251e509a9c14460a573f7584f42d206362e4e 100644 --- a/core/src/test/scala/spark/Utils.scala +++ b/core/src/test/scala/spark/UtilsSuite.scala @@ -1,7 +1,8 @@ package spark import org.scalatest.FunSuite - +import java.io.{ByteArrayOutputStream, ByteArrayInputStream} +import util.Random class UtilsSuite extends FunSuite { @@ -14,5 +15,15 @@ class UtilsSuite extends FunSuite { assert(Utils.memoryBytesToString(5368709120L) === "5.0GB") } + test("copyStream") { + //input array initialization + val bytes = Array.ofDim[Byte](9000) + Random.nextBytes(bytes) + + val os = new ByteArrayOutputStream() + Utils.copyStream(new ByteArrayInputStream(bytes), os) + + assert(os.toByteArray.toList.equals(bytes.toList)) + } }