diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 22110832f89c33fd690eb6e6d58e6a97980d7f8d..356637825e88837892bc60c25e11b9861138d593 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -43,8 +43,6 @@ class CacheTrackerActor extends Actor with Logging {
   
   def receive = {
     case SlaveCacheStarted(host: String, size: Long) =>
-      logInfo("Started slave cache (size %s) on %s".format(
-        Utils.memoryBytesToString(size), host))
       slaveCapacity.put(host, size)
       slaveUsage.put(host, 0)
       sender ! true
@@ -56,22 +54,12 @@ class CacheTrackerActor extends Actor with Logging {
 
     case AddedToCache(rddId, partition, host, size) =>
       slaveUsage.put(host, getCacheUsage(host) + size)
-      logInfo("Cache entry added: (%s, %s) on %s (size added: %s, available: %s)".format(
-        rddId, partition, host, Utils.memoryBytesToString(size),
-        Utils.memoryBytesToString(getCacheAvailable(host))))
       locs(rddId)(partition) = host :: locs(rddId)(partition)
       sender ! true
 
     case DroppedFromCache(rddId, partition, host, size) =>
-      logInfo("Cache entry removed: (%s, %s) on %s (size dropped: %s, available: %s)".format(
-        rddId, partition, host, Utils.memoryBytesToString(size),
-        Utils.memoryBytesToString(getCacheAvailable(host))))
       slaveUsage.put(host, getCacheUsage(host) - size)
       // Do a sanity check to make sure usage is greater than 0.
-      val usage = getCacheUsage(host)
-      if (usage < 0) {
-        logError("Cache usage on %s is negative (%d)".format(host, usage))
-      }
       locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host)
       sender ! true
 
@@ -223,7 +211,7 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl
         logInfo("Computing partition " + split)
         try {
           // BlockManager will iterate over results from compute to create RDD
-          blockManager.put(key, rdd.compute(split), storageLevel, false)
+          blockManager.put(key, rdd.compute(split), storageLevel, true)
           //future.apply() // Wait for the reply from the cache tracker
           blockManager.get(key) match {
             case Some(values) => 
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 1211f0f2c226c52d5c703388c22a7f6b9594394d..11a1cbb73e9924e5cb3204baae1a371e4cec536a 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1,29 +1,21 @@
 package spark.storage
 
-import java.io._
-import java.nio._
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.LinkedBlockingQueue
-import java.util.Collections
-
 import akka.dispatch.{Await, Future}
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.{HashMap, HashSet}
-import scala.collection.mutable.Queue
-import scala.collection.JavaConversions._
+import akka.util.Duration
 
-import it.unimi.dsi.fastutil.io._
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
 
-import spark.CacheTracker
-import spark.Logging
-import spark.Serializer
-import spark.SizeEstimator
-import spark.SparkEnv
-import spark.SparkException
-import spark.Utils
-import spark.util.ByteBufferInputStream
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.nio.ByteBuffer
+import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+import scala.collection.JavaConversions._
+
+import spark.{CacheTracker, Logging, Serializer, SizeEstimator, SparkException, Utils}
 import spark.network._
-import akka.util.Duration
+import spark.util.ByteBufferInputStream
+
 
 class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
   def this() = this(null, 0)
@@ -49,7 +41,8 @@ class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
 }
 
 
-case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message)
+case class BlockException(blockId: String, message: String, ex: Exception = null)
+extends Exception(message)
 
 
 class BlockLocker(numLockers: Int) {
@@ -115,10 +108,14 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
   }
 
   /**
-   * Change storage level for a local block and tell master is necesary. 
-   * If new level is invalid, then block info (if it exists) will be silently removed.
+   * Change the storage level for a local block in the block info meta data, and
+   * tell the master if necessary. Note that this is only a meta data change and
+   * does NOT actually change the storage of the block. If the new level is
+   * invalid, then block info (if exists) will be silently removed.
    */
-  def setLevel(blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
+  private[spark] def setLevelAndTellMaster(
+    blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
+
     if (level == null) {
       throw new IllegalArgumentException("Storage level is null")
     }
@@ -141,8 +138,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
    
     // Tell master if necessary
     if (newTellMaster) {
+      master.mustHeartBeat(HeartBeat(
+        blockManagerId,
+        blockId,
+        level,
+        if (level.isValid && level.useMemory) memoryStore.getSize(blockId) else 0,
+        if (level.isValid && level.useDisk) diskStore.getSize(blockId) else 0))
       logDebug("Told master about block " + blockId)
-      notifyMaster(HeartBeat(blockManagerId, blockId, level, 0, 0)) 
     } else {
       logDebug("Did not tell master about block " + blockId)
     }
@@ -431,9 +433,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
           case _ => throw new Exception("Unexpected return value")
         }
       }
-        
+
       // Store the storage level
-      setLevel(blockId, level, tellMaster)
+      setLevelAndTellMaster(blockId, level, tellMaster)
     }
     logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
 
@@ -461,7 +463,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
   /**
    * Put a new block of serialized bytes to the block manager.
    */
-  def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
+  def putBytes(
+    blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
+
     if (blockId == null) {
       throw new IllegalArgumentException("Block Id is null")
     }
@@ -500,7 +504,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
       }
 
       // Store the storage level
-      setLevel(blockId, level, tellMaster)
+      setLevelAndTellMaster(blockId, level, tellMaster)
     }
 
     // TODO: This code will be removed when CacheTracker is gone.
@@ -587,7 +591,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
       }
       memoryStore.remove(blockId)  
       val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication)
-      setLevel(blockId, newLevel)
+      setLevelAndTellMaster(blockId, newLevel)
     }
   }
 
@@ -606,10 +610,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
     return ser.deserializeStream(new ByteBufferInputStream(bytes)).toIterator
   }
 
-  private def notifyMaster(heartBeat: HeartBeat) {
-    master.mustHeartBeat(heartBeat)
-  }
-
   def stop() {
     connectionManager.stop()
     blockInfo.clear()
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 9f03c5a32c2929da2fff2ec33202824a72ea5683..2f14db4e2861385b243b794a31d884eae514d9cc 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -3,22 +3,18 @@ package spark.storage
 import java.io._
 import java.util.{HashMap => JHashMap}
 
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.util.Random
 
 import akka.actor._
 import akka.dispatch._
 import akka.pattern.ask
 import akka.remote._
-import akka.util.Duration
-import akka.util.Timeout
+import akka.util.{Duration, Timeout}
 import akka.util.duration._
 
-import spark.Logging
-import spark.SparkException
-import spark.Utils
+import spark.{Logging, SparkException, Utils}
+
 
 sealed trait ToBlockManagerMaster
 
@@ -27,13 +23,13 @@ case class RegisterBlockManager(
     maxMemSize: Long,
     maxDiskSize: Long)
   extends ToBlockManagerMaster
-  
+
 class HeartBeat(
     var blockManagerId: BlockManagerId,
     var blockId: String,
     var storageLevel: StorageLevel,
-    var deserializedSize: Long,
-    var size: Long)
+    var memSize: Long,
+    var diskSize: Long)
   extends ToBlockManagerMaster
   with Externalizable {
 
@@ -43,8 +39,8 @@ class HeartBeat(
     blockManagerId.writeExternal(out)
     out.writeUTF(blockId)
     storageLevel.writeExternal(out)
-    out.writeInt(deserializedSize.toInt)
-    out.writeInt(size.toInt)
+    out.writeInt(memSize.toInt)
+    out.writeInt(diskSize.toInt)
   }
 
   override def readExternal(in: ObjectInput) {
@@ -53,8 +49,8 @@ class HeartBeat(
     blockId = in.readUTF()
     storageLevel = new StorageLevel()
     storageLevel.readExternal(in)
-    deserializedSize = in.readInt()
-    size = in.readInt()
+    memSize = in.readInt()
+    diskSize = in.readInt()
   }
 }
 
@@ -62,15 +58,14 @@ object HeartBeat {
   def apply(blockManagerId: BlockManagerId,
       blockId: String,
       storageLevel: StorageLevel,
-      deserializedSize: Long,
-      size: Long): HeartBeat = {
-    new HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size)
+      memSize: Long,
+      diskSize: Long): HeartBeat = {
+    new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize)
   }
 
- 
   // For pattern-matching
   def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
-    Some((h.blockManagerId, h.blockId, h.storageLevel, h.deserializedSize, h.size))
+    Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
   }
 }
   
@@ -88,49 +83,64 @@ case object StopBlockManagerMaster extends ToBlockManagerMaster
 class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
   
   class BlockManagerInfo(
+      val blockManagerId: BlockManagerId,
       timeMs: Long,
-      maxMem: Long,
-      maxDisk: Long) {
+      val maxMem: Long,
+      val maxDisk: Long) {
     private var lastSeenMs = timeMs
     private var remainedMem = maxMem
     private var remainedDisk = maxDisk
     private val blocks = new JHashMap[String, StorageLevel]
+
+    logInfo("Registering block manager (%s:%d, ram: %d, disk: %d)".format(
+      blockManagerId.ip, blockManagerId.port, maxMem, maxDisk))
     
     def updateLastSeenMs() {
       lastSeenMs = System.currentTimeMillis() / 1000
     }
     
-    def addBlock(blockId: String, storageLevel: StorageLevel, deserializedSize: Long, size: Long) =
-        synchronized {
+    def updateBlockInfo(
+      blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) = synchronized {
+
       updateLastSeenMs()
       
       if (blocks.containsKey(blockId)) {
-        val oriLevel: StorageLevel = blocks.get(blockId)
+        // The block exists on the slave already.
+        val originalLevel: StorageLevel = blocks.get(blockId)
         
-        if (oriLevel.deserialized) {
-          remainedMem += deserializedSize
+        if (originalLevel.useMemory) {
+          remainedMem += memSize
         }
-        if (oriLevel.useMemory) {
-          remainedMem += size
-        }
-        if (oriLevel.useDisk) {
-          remainedDisk += size
+        if (originalLevel.useDisk) {
+          remainedDisk += diskSize
         }
       }
       
-      if (storageLevel.isValid) { 
+      if (storageLevel.isValid) {
+        // isValid means it is either stored in-memory or on-disk.
         blocks.put(blockId, storageLevel)
-        if (storageLevel.deserialized) {
-          remainedMem -= deserializedSize
-        }
         if (storageLevel.useMemory) {
-          remainedMem -= size
+          remainedMem -= memSize
+          logInfo("Added %s in memory on %s:%d (size: %d, free: %d)".format(
+            blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedMem))
         }
         if (storageLevel.useDisk) {
-          remainedDisk -= size
+          remainedDisk -= diskSize
+          logInfo("Added %s on disk on %s:%d (size: %d, free: %d)".format(
+            blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk))
         }
-      } else {
+      } else if (blocks.containsKey(blockId)) {
+        // If isValid is not true, drop the block.
+        val originalLevel: StorageLevel = blocks.get(blockId)
         blocks.remove(blockId)
+        if (originalLevel.useMemory) {
+          logInfo("Removed %s on %s:%d in memory (size: %d, free: %d)".format(
+            blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedDisk))
+        }
+        if (originalLevel.useDisk) {
+          logInfo("Removed %s on %s:%d on disk (size: %d, free: %d)".format(
+            blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk))
+        }
       }
     }
 
@@ -204,12 +214,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
     val startTimeMs = System.currentTimeMillis()
     val tmp = " " + blockManagerId + " "
     logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
-    logInfo("Got Register Msg from " + blockManagerId)
     if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
       logInfo("Got Register Msg from master node, don't register it")
     } else {
       blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
-        System.currentTimeMillis() / 1000, maxMemSize, maxDiskSize))
+        blockManagerId, System.currentTimeMillis() / 1000, maxMemSize, maxDiskSize))
     }
     logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
     sender ! true
@@ -219,8 +228,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
       blockManagerId: BlockManagerId,
       blockId: String,
       storageLevel: StorageLevel,
-      deserializedSize: Long,
-      size: Long) {
+      memSize: Long,
+      diskSize: Long) {
     
     val startTimeMs = System.currentTimeMillis()
     val tmp = " " + blockManagerId + " " + blockId + " "
@@ -231,7 +240,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
       sender ! true
     }
     
-    blockManagerInfo(blockManagerId).addBlock(blockId, storageLevel, deserializedSize, size)
+    blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
     
     var locations: HashSet[BlockManagerId] = null
     if (blockInfo.containsKey(blockId)) {
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
index d74cdb38a8bd5a2282008775c9d94c72c74ca492..e317ad36427767fcaca76173321e0a4371b50f34 100644
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
@@ -1,23 +1,21 @@
 package spark.storage
 
-import java.nio._
+import java.nio.ByteBuffer
 
 import scala.actors._
 import scala.actors.Actor._
 import scala.actors.remote._
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.util.Random
 
-import spark.Logging
-import spark.Utils
-import spark.SparkEnv
+import spark.{Logging, Utils, SparkEnv}
 import spark.network._
 
 /**
- * This should be changed to use event model late. 
+ * A network interface for BlockManager. Each slave should have one
+ * BlockManagerWorker.
+ *
+ * TODO: Use event model.
  */
 class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
   initLogging()
@@ -32,7 +30,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
           logDebug("Handling as a buffer message " + bufferMessage)
           val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage)
           logDebug("Parsed as a block message array")
-          val responseMessages = blockMessages.map(processBlockMessage _).filter(_ != None).map(_.get)
+          val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
           /*logDebug("Processed block messages")*/
           return Some(new BlockMessageArray(responseMessages).toBufferMessage)
         } catch {
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
index 0b2ed69e0786d7bda94b73470b490336da9871ba..b9833273e5b15e3a4c8fb7045a8838384d0dc1d8 100644
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/spark/storage/BlockMessage.scala
@@ -1,6 +1,6 @@
 package spark.storage
 
-import java.nio._
+import java.nio.ByteBuffer
 
 import scala.collection.mutable.StringBuilder
 import scala.collection.mutable.ArrayBuffer
diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala
index 497a19856e21446cc732563d8d15d724a26ffe06..928857056fad10c00c8890e46ed1fc8a5268587b 100644
--- a/core/src/main/scala/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala
@@ -1,5 +1,6 @@
 package spark.storage
-import java.nio._
+
+import java.nio.ByteBuffer
 
 import scala.collection.mutable.StringBuilder
 import scala.collection.mutable.ArrayBuffer
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 17f4f51aa8b8e0eba922919fb96c9d39ed47956d..f66b5bc897cdd09a99806c76451fd1bc890b540f 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,15 +1,14 @@
 package spark.storage
 
-import spark.{Utils, Logging, Serializer, SizeEstimator}
-import scala.collection.mutable.ArrayBuffer
 import java.io.{File, RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
-import java.util.{UUID, LinkedHashMap}
-import java.util.concurrent.Executors
-import java.util.concurrent.ConcurrentHashMap
-import it.unimi.dsi.fastutil.io._
-import java.util.concurrent.ArrayBlockingQueue
+import java.util.{LinkedHashMap, UUID}
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
+
+import scala.collection.mutable.ArrayBuffer
+
+import spark.{Utils, Logging, Serializer, SizeEstimator}
 
 /**
  * Abstract class to store blocks
@@ -19,7 +18,13 @@ abstract class BlockStore(blockManager: BlockManager) extends Logging {
 
   def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) 
 
-  def putValues(blockId: String, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], ByteBuffer]
+  def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
+  : Either[Iterator[Any], ByteBuffer]
+
+  /**
+   * Return the size of a block.
+   */
+  def getSize(blockId: String): Long
 
   def getBytes(blockId: String): Option[ByteBuffer]
 
@@ -62,6 +67,11 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     }
   }
   blockDropper.start()
+  logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
+
+  def freeMemory: Long = maxMemory - currentMemory
+
+  def getSize(blockId: String): Long = memoryStore.synchronized { memoryStore.get(blockId).size }
   
   def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
     if (level.deserialized) {
@@ -74,17 +84,20 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       val entry = new Entry(elements, sizeEstimate, true)
       memoryStore.synchronized { memoryStore.put(blockId, entry) }
       currentMemory += sizeEstimate
-      logDebug("Block " + blockId + " stored as values to memory")
+      logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
+        blockId, sizeEstimate, freeMemory))
     } else {
       val entry = new Entry(bytes, bytes.array().length, false)
       ensureFreeSpace(bytes.array.length)
       memoryStore.synchronized { memoryStore.put(blockId, entry) }
       currentMemory += bytes.array().length
-      logDebug("Block " + blockId + " stored as " + bytes.array().length + " bytes to memory")
+      logInfo("Block %s stored as %d bytes to memory (free %d)".format(
+        blockId, bytes.array().length, freeMemory))
     }
   }
 
-  def putValues(blockId: String, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], ByteBuffer] = {
+  def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
+  : Either[Iterator[Any], ByteBuffer] = {
     if (level.deserialized) {
       val elements = new ArrayBuffer[Any]
       elements ++= values
@@ -93,7 +106,8 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       val entry = new Entry(elements, sizeEstimate, true)
       memoryStore.synchronized { memoryStore.put(blockId, entry) }
       currentMemory += sizeEstimate
-      logDebug("Block " + blockId + " stored as values to memory")
+      logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
+        blockId, sizeEstimate, freeMemory))
       return Left(elements.iterator) 
     } else {
       val bytes = dataSerialize(values)
@@ -101,7 +115,8 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       val entry = new Entry(bytes, bytes.array().length, false)
       memoryStore.synchronized { memoryStore.put(blockId, entry) } 
       currentMemory += bytes.array().length
-      logDebug("Block " + blockId + " stored as " + bytes.array.length + " bytes to memory")
+      logInfo("Block %s stored as %d bytes to memory (free %d)".format(
+        blockId, bytes.array.length, freeMemory))
       return Right(bytes)
     }
   }
@@ -128,7 +143,8 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       if (entry != null) {
         memoryStore.remove(blockId)
         currentMemory -= entry.size
-        logDebug("Block " + blockId + " of size " + entry.size + " dropped from memory")
+        logInfo("Block %s of size %d dropped from memory (free %d)".format(
+          blockId, entry.size, freeMemory))
       } else {
         logWarning("Block " + blockId + " could not be removed as it doesnt exist")
       }
@@ -164,11 +180,11 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
             entry.dropPending = true
           }
           selectedMemory += pair.getValue.size
-          logDebug("Block " + blockId + " selected for dropping")
+          logInfo("Block " + blockId + " selected for dropping")
         }
       }  
       
-      logDebug("" + selectedBlocks.size + " new blocks selected for dropping, " + 
+      logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " + 
         blocksToDrop.size + " blocks pending")
       var i = 0 
       while (i < selectedBlocks.size) {
@@ -192,7 +208,11 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
   var lastLocalDirUsed = 0
 
   addShutdownHook()
-  
+
+  def getSize(blockId: String): Long = {
+    getFile(blockId).length
+  }
+
   def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
     logDebug("Attempting to put block " + blockId)
     val startTime = System.currentTimeMillis
@@ -203,13 +223,15 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
       buffer.put(bytes.array)
       channel.close()
       val finishTime = System.currentTimeMillis
-      logDebug("Block " + blockId + " stored to file of " + bytes.array.length + " bytes to disk in " + (finishTime - startTime) + " ms")
+      logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
+        blockId, bytes.array.length, (finishTime - startTime)))
     } else {
       logError("File not created for block " + blockId)
     }
   }
 
-  def putValues(blockId: String, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], ByteBuffer] = {
+  def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
+  : Either[Iterator[Any], ByteBuffer] = {
     val bytes = dataSerialize(values) 
     logDebug("Converted block " + blockId + " to " + bytes.array.length + " bytes")
     putBytes(blockId, bytes, level)
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index f067a2a6c5838de2e767b7d0bb5b30111c1eeb7a..1d38ca13cc020de8419d69cfd070f0ab70c5ca9c 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -1,6 +1,6 @@
 package spark.storage
 
-import java.io._
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
 class StorageLevel(
     var useDisk: Boolean, 
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index f3f891e47143aa9d6baae0ddcbe5182e26a0f68e..9e55647bd0ade29e8e16c6e2b5beb42a07310253 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -70,8 +70,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
     
     // Setting storage level of a1 and a2 to invalid; they should be removed from store and master
-    store.setLevel("a1", new StorageLevel(false, false, false, 1))
-    store.setLevel("a2", new StorageLevel(true, false, false, 0))
+    store.setLevelAndTellMaster("a1", new StorageLevel(false, false, false, 1))
+    store.setLevelAndTellMaster("a2", new StorageLevel(true, false, false, 0))
     assert(store.getSingle("a1") === None, "a1 not removed from store")
     assert(store.getSingle("a2") === None, "a2 not removed from store")
     assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")