Skip to content
Snippets Groups Projects
Commit 69372b48 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #133 from Benky/56524587

BoundedMemoryCache.put should fail when estimated size of 'value' is larger than cache capacity
parents 10716b17 56524587
No related branches found
No related tags found
No related merge requests found
......@@ -9,16 +9,16 @@ import java.util.LinkedHashMap
* some cache entries have pointers to a shared object. Nonetheless, this Cache should work well
* when most of the space is used by arrays of primitives or of simple classes.
*/
class BoundedMemoryCache extends Cache with Logging {
private val maxBytes: Long = getMaxBytes()
class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging {
logInfo("BoundedMemoryCache.maxBytes = " + maxBytes)
def this() {
this(BoundedMemoryCache.getMaxBytes)
}
private var currentBytes = 0L
private val map = new LinkedHashMap[(Any, Int), Entry](32, 0.75f, true)
// An entry in our map; stores a cached object and its size in bytes
class Entry(val value: Any, val size: Long) {}
override def get(datasetId: Any, partition: Int): Any = {
synchronized {
val entry = map.get((datasetId, partition))
......@@ -33,13 +33,11 @@ class BoundedMemoryCache extends Cache with Logging {
override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = {
val key = (datasetId, partition)
logInfo("Asked to add key " + key)
val startTime = System.currentTimeMillis
val size = SizeEstimator.estimate(value.asInstanceOf[AnyRef])
val timeTaken = System.currentTimeMillis - startTime
logInfo("Estimated size for key %s is %d".format(key, size))
logInfo("Size estimation for key %s took %d ms".format(key, timeTaken))
val size = estimateValueSize(key, value)
synchronized {
if (ensureFreeSpace(datasetId, size)) {
if (size > getCapacity) {
return CachePutFailure()
} else if (ensureFreeSpace(datasetId, size)) {
logInfo("Adding key " + key)
map.put(key, new Entry(value, size))
currentBytes += size
......@@ -54,10 +52,16 @@ class BoundedMemoryCache extends Cache with Logging {
override def getCapacity: Long = maxBytes
private def getMaxBytes(): Long = {
val memoryFractionToUse = System.getProperty(
"spark.boundedMemoryCache.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFractionToUse).toLong
/**
* Estimate sizeOf 'value'
*/
private def estimateValueSize(key: (Any, Int), value: Any) = {
val startTime = System.currentTimeMillis
val size = SizeEstimator.estimate(value.asInstanceOf[AnyRef])
val timeTaken = System.currentTimeMillis - startTime
logInfo("Estimated size for key %s is %d".format(key, size))
logInfo("Size estimation for key %s took %d ms".format(key, timeTaken))
size
}
/**
......@@ -85,8 +89,21 @@ class BoundedMemoryCache extends Cache with Logging {
}
protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) {
logInfo("Dropping key (%s, %d) of size %d to make space".format(
datasetId, partition, entry.size))
logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
SparkEnv.get.cacheTracker.dropEntry(datasetId, partition)
}
}
// An entry in our map; stores a cached object and its size in bytes
case class Entry(value: Any, size: Long)
object BoundedMemoryCache {
/**
* Get maximum cache capacity from system configuration
*/
def getMaxBytes: Long = {
val memoryFractionToUse = System.getProperty("spark.boundedMemoryCache.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFractionToUse).toLong
}
}
package spark
import org.scalatest.FunSuite
class BoundedMemoryCacheTest extends FunSuite {
test("constructor test") {
val cache = new BoundedMemoryCache(40)
expect(40)(cache.getCapacity)
}
test("caching") {
val cache = new BoundedMemoryCache(40) {
//TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry'
override protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) {
logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
}
}
//should be OK
expect(CachePutSuccess(30))(cache.put("1", 0, "Meh"))
//we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from
//cache because it's from the same dataset
expect(CachePutFailure())(cache.put("1", 1, "Meh"))
//should be OK, dataset '1' can be evicted from cache
expect(CachePutSuccess(30))(cache.put("2", 0, "Meh"))
//should fail, cache should obey it's capacity
expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string"))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment