diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala
index c25d0a62dfe126dca3bf8f6f85533fc0060245dc..f778d8cc17d0c69b8a17de1e1c38d9b56a82a030 100644
--- a/core/src/main/scala/spark/BoundedMemoryCache.scala
+++ b/core/src/main/scala/spark/BoundedMemoryCache.scala
@@ -30,7 +30,7 @@ class BoundedMemoryCache extends Cache with Logging {
     }
   }
 
-  override def put(datasetId: Any, partition: Int, value: Any): Long = {
+  override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = {
     val key = (datasetId, partition)
     logInfo("Asked to add key " + key)
     val startTime = System.currentTimeMillis
@@ -44,10 +44,10 @@ class BoundedMemoryCache extends Cache with Logging {
         map.put(key, new Entry(value, size))
         currentBytes += size
         logInfo("Number of entries is now " + map.size)
-        return size
+        return CachePutSuccess(size)
       } else {
         logInfo("Didn't add key " + key + " because we would have evicted part of same dataset")
-        return -1L
+        return CachePutFailure()
       }
     }
   }
diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala
index a65d3b478de5b78319f4aeda8b73b7065b362dd5..aeff205884df96ba0710a1f22ac7840d44dc77e1 100644
--- a/core/src/main/scala/spark/Cache.scala
+++ b/core/src/main/scala/spark/Cache.scala
@@ -2,6 +2,10 @@ package spark
 
 import java.util.concurrent.atomic.AtomicInteger
 
+sealed trait CachePutResponse
+case class CachePutSuccess(size: Long) extends CachePutResponse
+case class CachePutFailure extends CachePutResponse
+
 /**
  * An interface for caches in Spark, to allow for multiple implementations. Caches are used to store
  * both partitions of cached RDDs and broadcast variables on Spark executors. Caches are also aware
@@ -31,12 +35,12 @@ abstract class Cache {
   def get(datasetId: Any, partition: Int): Any
 
   /**
-   * Attempt to put a value in the cache; returns a negative number if this was
-   * not successful (e.g. because the cache replacement policy forbids it). If
-   * size estimation is available, the cache implementation should return the
-   * estimated size of the partition if the partition is successfully cached.
+   * Attempt to put a value in the cache; returns CachePutFailure if this was
+   * not successful (e.g. because the cache replacement policy forbids it), and
+   * CachePutSuccess if successful. If size estimation is available, the cache
+   * implementation should set the size field in CachePutSuccess.
    */
-  def put(datasetId: Any, partition: Int, value: Any): Long
+  def put(datasetId: Any, partition: Int, value: Any): CachePutResponse
 
   /**
    * Report the capacity of the cache partition. By default this just reports
@@ -52,7 +56,7 @@ class KeySpace(cache: Cache, val keySpaceId: Int) {
   def get(datasetId: Any, partition: Int): Any =
     cache.get((keySpaceId, datasetId), partition)
 
-  def put(datasetId: Any, partition: Int, value: Any): Long =
+  def put(datasetId: Any, partition: Int, value: Any): CachePutResponse =
     cache.put((keySpaceId, datasetId), partition, value)
 
   def getCapacity: Long = cache.getCapacity
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index b472dc8070c6964fb08d2af69b4a85bed6f5deda..5b5831b2defdfd0ca038c4480eaa301cf9d72cea 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -20,15 +20,19 @@ case object StopCacheTracker extends CacheTrackerMessage
 
 
 class CacheTrackerActor extends DaemonActor with Logging {
-  val locs = new HashMap[Int, Array[List[String]]]
+  private val locs = new HashMap[Int, Array[List[String]]]
 
   /**
    * A map from the slave's host name to its cache size.
    */
-  val slaveCapacity = new HashMap[String, Long]
-  val slaveUsage = new HashMap[String, Long]
+  private val slaveCapacity = new HashMap[String, Long]
+  private val slaveUsage = new HashMap[String, Long]
 
   // TODO: Should probably store (String, CacheType) tuples
+
+  private def getCacheUsage(host: String): Long = slaveUsage.getOrElse(host, 0L)
+  private def getCacheCapacity(host: String): Long = slaveCapacity.getOrElse(host, 0L)
+  private def getCacheAvailable(host: String): Long = getCacheCapacity(host) - getCacheUsage(host)
   
   def act() {
     val port = System.getProperty("spark.master.port").toInt
@@ -39,7 +43,8 @@ class CacheTrackerActor extends DaemonActor with Logging {
     loop {
       react {
         case SlaveCacheStarted(host: String, size: Long) =>
-          logInfo("Started slave cache (size %s) on %s".format(Utils.sizeWithSuffix(size), host))
+          logInfo("Started slave cache (size %s) on %s".format(
+            Utils.memoryBytesToString(size), host))
           slaveCapacity.put(host, size)
           slaveUsage.put(host, 0)
           reply('OK)
@@ -51,9 +56,10 @@ class CacheTrackerActor extends DaemonActor with Logging {
         
         case AddedToCache(rddId, partition, host, size) =>
           if (size > 0) {
-            logInfo("Cache entry added: (%s, %s) on %s, size: %s".format(
-              rddId, partition, host, Utils.sizeWithSuffix(size)))
             slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) + size)
+            logInfo("Cache entry added: (%s, %s) on %s (size added: %s, available: %s)".format(
+              rddId, partition, host, Utils.memoryBytesToString(size),
+              Utils.memoryBytesToString(getCacheAvailable(host))))
           } else {
             logInfo("Cache entry added: (%s, %s) on %s".format(rddId, partition, host))
           }
@@ -62,8 +68,9 @@ class CacheTrackerActor extends DaemonActor with Logging {
           
         case DroppedFromCache(rddId, partition, host, size) =>
           if (size > 0) {
-            logInfo("Cache entry removed: (%s, %s) on %s, size: %s".format(
-              rddId, partition, host, Utils.sizeWithSuffix(size)))
+            logInfo("Cache entry removed: (%s, %s) on %s (size dropped: %s, available: %s)".format(
+              rddId, partition, host, Utils.memoryBytesToString(size),
+              Utils.memoryBytesToString(getCacheAvailable(host))))
             slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) - size)
 
             // Do a sanity check to make sure usage is greater than 0.
@@ -199,10 +206,10 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
       // TODO: fetch any remote copy of the split that may be available
       logInfo("Computing partition " + split)
       var array: Array[T] = null
-      var putRetval: Long = -1L
+      var putResponse: CachePutResponse = null
       try {
         array = rdd.compute(split).toArray(m)
-        putRetval = cache.put(rdd.id, split.index, array)
+        putResponse = cache.put(rdd.id, split.index, array)
       } finally {
         // Tell other threads that we've finished our attempt to load the key (whether or not
         // we've actually succeeded to put it in the map)
@@ -211,10 +218,14 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
           loading.notifyAll()
         }
       }
-      if (putRetval >= 0) {
-        // Tell the master that we added the entry. Don't return until it replies so it can
-        // properly schedule future tasks that use this RDD.
-        trackerActor !? AddedToCache(rdd.id, split.index, host, putRetval)
+
+      putResponse match {
+        case CachePutSuccess(size) => {
+          // Tell the master that we added the entry. Don't return until it
+          // replies so it can properly schedule future tasks that use this RDD.
+          trackerActor !? AddedToCache(rdd.id, split.index, host, size)
+        }
+        case _ => null
       }
       return array.iterator
     }
diff --git a/core/src/main/scala/spark/DiskSpillingCache.scala b/core/src/main/scala/spark/DiskSpillingCache.scala
index 037ed786887c6d22aecd090c21fd8ef56a384cfb..e11466eb64eec01e923bde295867653d88bb7706 100644
--- a/core/src/main/scala/spark/DiskSpillingCache.scala
+++ b/core/src/main/scala/spark/DiskSpillingCache.scala
@@ -44,7 +44,7 @@ class DiskSpillingCache extends BoundedMemoryCache {
     }
   }
 
-  override def put(datasetId: Any, partition: Int, value: Any): Long = {
+  override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = {
     var ser = SparkEnv.get.serializer.newInstance()
     super.put(datasetId, partition, ser.serialize(value))
   }
diff --git a/core/src/main/scala/spark/SerializingCache.scala b/core/src/main/scala/spark/SerializingCache.scala
index 17dc735d5e02447f3781124271139d72d79a256b..3d192f24034a0f5a59a7247bf2850ba29efbbc80 100644
--- a/core/src/main/scala/spark/SerializingCache.scala
+++ b/core/src/main/scala/spark/SerializingCache.scala
@@ -9,7 +9,7 @@ import java.io._
 class SerializingCache extends Cache with Logging {
   val bmc = new BoundedMemoryCache
 
-  override def put(datasetId: Any, partition: Int, value: Any): Long = {
+  override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = {
     val ser = SparkEnv.get.serializer.newInstance()
     bmc.put(datasetId, partition, ser.serialize(value))
   }
diff --git a/core/src/main/scala/spark/SoftReferenceCache.scala b/core/src/main/scala/spark/SoftReferenceCache.scala
index cd2386eb838d64739ebfb23fbca8e2df631c280a..ce9370c5d72137e801a868ec24fce3e19b388346 100644
--- a/core/src/main/scala/spark/SoftReferenceCache.scala
+++ b/core/src/main/scala/spark/SoftReferenceCache.scala
@@ -11,8 +11,8 @@ class SoftReferenceCache extends Cache {
   override def get(datasetId: Any, partition: Int): Any =
     map.get((datasetId, partition))
 
-  override def put(datasetId: Any, partition: Int, value: Any): Long = {
+  override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = {
     map.put((datasetId, partition), value)
-    return 0
+    return CachePutSuccess(0)
   }
 }
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 5aecbdde7dea2ab5d85e21ff8a8006735a4cecda..d108c14f6be30a3668b5e68ca6c2344d2a653281 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -180,7 +180,7 @@ object Utils {
    * Petabyte) in order to reduce the number of digits to four or less. For
    * example, 4,000,000 is returned as 4MB.
    */
-  def sizeWithSuffix(size: Long): String = {
+  def memoryBytesToString(size: Long): String = {
     val GB = 1L << 30
     val MB = 1L << 20
     val KB = 1L << 10
diff --git a/core/src/test/scala/spark/Utils.scala b/core/src/test/scala/spark/Utils.scala
index b78b638bb1688be317c130199f7e0c3180111e1a..4e852903be663423e2686806e0a82dec2bf55ba9 100644
--- a/core/src/test/scala/spark/Utils.scala
+++ b/core/src/test/scala/spark/Utils.scala
@@ -5,13 +5,13 @@ import org.scalatest.FunSuite
 
 class UtilsSuite extends FunSuite {
 
-  test("sizeWithSuffix") {
-    assert(Utils.sizeWithSuffix(10) === "10.0B")
-    assert(Utils.sizeWithSuffix(1500) === "1500.0B")
-    assert(Utils.sizeWithSuffix(2000000) === "1953.1KB")
-    assert(Utils.sizeWithSuffix(2097152) === "2.0MB")
-    assert(Utils.sizeWithSuffix(2306867) === "2.2MB")
-    assert(Utils.sizeWithSuffix(5368709120L) === "5.0GB")
+  test("memoryBytesToString") {
+    assert(Utils.memoryBytesToString(10) === "10.0B")
+    assert(Utils.memoryBytesToString(1500) === "1500.0B")
+    assert(Utils.memoryBytesToString(2000000) === "1953.1KB")
+    assert(Utils.memoryBytesToString(2097152) === "2.0MB")
+    assert(Utils.memoryBytesToString(2306867) === "2.2MB")
+    assert(Utils.memoryBytesToString(5368709120L) === "5.0GB")
   }
 
 }