diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 272d7cdad3bc213b1202b5d249015d957319181b..41441720a7c8f81004fb897763a6d8ae2d29e5d9 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -86,10 +86,13 @@ object SparkEnv extends Logging {
     }
 
     val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer")
-    
-    val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal)
+
+    val masterIp: String = System.getProperty("spark.master.host", "localhost")
+    val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
+    val blockManagerMaster = new BlockManagerMaster(
+      actorSystem, isMaster, isLocal, masterIp, masterPort)
     val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
-    
+
     val connectionManager = blockManager.connectionManager
 
     val broadcastManager = new BroadcastManager(isMaster)
@@ -104,7 +107,7 @@ object SparkEnv extends Logging {
 
     val shuffleFetcher = instantiateClass[ShuffleFetcher](
       "spark.shuffle.fetcher", "spark.BlockStoreShuffleFetcher")
-    
+
     val httpFileServer = new HttpFileServer()
     httpFileServer.initialize()
     System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index df295b18207b3253d303cee2e3d51730789ec953..b2c9e2cc40116b80b2e354d294679c7be62d0a7d 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1,59 +1,39 @@
 package spark.storage
 
-import akka.actor.{ActorSystem, Cancellable}
+import java.io.{InputStream, OutputStream}
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+import scala.collection.JavaConversions._
+
+import akka.actor.{ActorSystem, Cancellable, Props}
 import akka.dispatch.{Await, Future}
 import akka.util.Duration
 import akka.util.duration._
 
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
-import java.io.{InputStream, OutputStream, Externalizable, ObjectInput, ObjectOutput}
-import java.nio.{MappedByteBuffer, ByteBuffer}
-import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
-import scala.collection.JavaConversions._
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
 
 import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
 import spark.network._
 import spark.serializer.Serializer
-import spark.util.ByteBufferInputStream
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-import sun.nio.ch.DirectBuffer
-
-
-private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
-  def this() = this(null, 0)  // For deserialization only
-
-  def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
+import spark.util.{ByteBufferInputStream, GenerationIdUtil}
 
-  override def writeExternal(out: ObjectOutput) {
-    out.writeUTF(ip)
-    out.writeInt(port)
-  }
-
-  override def readExternal(in: ObjectInput) {
-    ip = in.readUTF()
-    port = in.readInt()
-  }
-
-  override def toString = "BlockManagerId(" + ip + ", " + port + ")"
-
-  override def hashCode = ip.hashCode * 41 + port
+import sun.nio.ch.DirectBuffer
 
-  override def equals(that: Any) = that match {
-    case id: BlockManagerId => port == id.port && ip == id.ip
-    case _ => false
-  }
-}
 
 private[spark]
 case class BlockException(blockId: String, message: String, ex: Exception = null)
 extends Exception(message)
 
 private[spark]
-class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
-                   val serializer: Serializer, maxMemory: Long)
+class BlockManager(
+    actorSystem: ActorSystem,
+    val master: BlockManagerMaster,
+    val serializer: Serializer,
+    maxMemory: Long)
   extends Logging {
 
   class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
@@ -110,6 +90,9 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
 
   val host = System.getProperty("spark.hostname", Utils.localHostName())
 
+  val slaveActor = master.actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
+    name = "BlockManagerActor" + GenerationIdUtil.BLOCK_MANAGER.next)
+
   @volatile private var shuttingDown = false
 
   private def heartBeat() {
@@ -134,8 +117,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
    * BlockManagerWorker actor.
    */
   private def initialize() {
-    master.mustRegisterBlockManager(
-      RegisterBlockManager(blockManagerId, maxMemory))
+    master.mustRegisterBlockManager(blockManagerId, maxMemory, slaveActor)
     BlockManagerWorker.startBlockManagerWorker(this)
     if (!BlockManager.getDisableHeartBeatsForTesting) {
       heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
@@ -171,8 +153,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
   def reregister() {
     // TODO: We might need to rate limit reregistering.
     logInfo("BlockManager reregistering with master")
-    master.mustRegisterBlockManager(
-      RegisterBlockManager(blockManagerId, maxMemory))
+    master.mustRegisterBlockManager(blockManagerId, maxMemory, slaveActor)
     reportAllBlocks()
   }
 
@@ -865,6 +846,25 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
     }
   }
 
+  /**
+   * Remove a block from both memory and disk. This one doesn't report to the master
+   * because it expects the master to initiate the original block removal command, and
+   * then the master can update the block tracking itself.
+   */
+  def removeBlock(blockId: String) {
+    logInfo("Removing block " + blockId)
+    val info = blockInfo.get(blockId)
+    if (info != null) info.synchronized {
+      // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+      memoryStore.remove(blockId)
+      diskStore.remove(blockId)
+      blockInfo.remove(blockId)
+    } else {
+      // The block has already been removed; do nothing.
+      logWarning("Block " + blockId + " does not exist.")
+    }
+  }
+
   def shouldCompress(blockId: String): Boolean = {
     if (blockId.startsWith("shuffle_")) {
       compressShuffle
@@ -914,6 +914,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
       heartBeatTask.cancel()
     }
     connectionManager.stop()
+    master.actorSystem.stop(slaveActor)
     blockInfo.clear()
     memoryStore.clear()
     diskStore.clear()
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
new file mode 100644
index 0000000000000000000000000000000000000000..03cd1418052bee5f59da816a08377fce81a57c58
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerId.scala
@@ -0,0 +1,29 @@
+package spark.storage
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+
+private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
+  def this() = this(null, 0)  // For deserialization only
+
+  def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
+
+  override def writeExternal(out: ObjectOutput) {
+    out.writeUTF(ip)
+    out.writeInt(port)
+  }
+
+  override def readExternal(in: ObjectInput) {
+    ip = in.readUTF()
+    port = in.readInt()
+  }
+
+  override def toString = "BlockManagerId(" + ip + ", " + port + ")"
+
+  override def hashCode = ip.hashCode * 41 + port
+
+  override def equals(that: Any) = that match {
+    case id: BlockManagerId => port == id.port && ip == id.ip
+    case _ => false
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 0a4e68f43769c6ced41060c14df5a8cc07298ba5..64cdb86f8d39eaf46ef74c85dd5a2192097ee3fb 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -17,95 +17,24 @@ import spark.{Logging, SparkException, Utils}
 
 
 private[spark]
-sealed trait ToBlockManagerMaster
+case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
 
-private[spark]
-case class RegisterBlockManager(
-    blockManagerId: BlockManagerId,
-    maxMemSize: Long)
-  extends ToBlockManagerMaster
-
-private[spark]
-case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
-
-private[spark]
-class BlockUpdate(
-    var blockManagerId: BlockManagerId,
-    var blockId: String,
-    var storageLevel: StorageLevel,
-    var memSize: Long,
-    var diskSize: Long)
-  extends ToBlockManagerMaster
-  with Externalizable {
-
-  def this() = this(null, null, null, 0, 0)  // For deserialization only
-
-  override def writeExternal(out: ObjectOutput) {
-    blockManagerId.writeExternal(out)
-    out.writeUTF(blockId)
-    storageLevel.writeExternal(out)
-    out.writeInt(memSize.toInt)
-    out.writeInt(diskSize.toInt)
-  }
-
-  override def readExternal(in: ObjectInput) {
-    blockManagerId = new BlockManagerId()
-    blockManagerId.readExternal(in)
-    blockId = in.readUTF()
-    storageLevel = new StorageLevel()
-    storageLevel.readExternal(in)
-    memSize = in.readInt()
-    diskSize = in.readInt()
-  }
-}
-
-private[spark]
-object BlockUpdate {
-  def apply(blockManagerId: BlockManagerId,
-      blockId: String,
-      storageLevel: StorageLevel,
-      memSize: Long,
-      diskSize: Long): BlockUpdate = {
-    new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
-  }
-
-  // For pattern-matching
-  def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
-    Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
-  }
-}
-
-private[spark]
-case class GetLocations(blockId: String) extends ToBlockManagerMaster
-
-private[spark]
-case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
-
-private[spark]
-case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
-
-private[spark]
-case class RemoveHost(host: String) extends ToBlockManagerMaster
-
-private[spark]
-case object StopBlockManagerMaster extends ToBlockManagerMaster
-
-private[spark]
-case object GetMemoryStatus extends ToBlockManagerMaster
 
+// TODO(rxin): Move BlockManagerMasterActor to its own file.
 private[spark]
-case object ExpireDeadHosts extends ToBlockManagerMaster
-
-
-private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
+class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
 
   class BlockManagerInfo(
       val blockManagerId: BlockManagerId,
       timeMs: Long,
-      val maxMem: Long) {
-    private var _lastSeenMs = timeMs
-    private var _remainingMem = maxMem
-    private val _blocks = new JHashMap[String, StorageLevel]
+      val maxMem: Long,
+      val slaveActor: ActorRef) {
+
+    private var _lastSeenMs: Long = timeMs
+    private var _remainingMem: Long = maxMem
+
+    // Mapping from block id to its status.
+    private val _blocks = new JHashMap[String, BlockStatus]
 
     logInfo("Registering block manager %s:%d with %s RAM".format(
       blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
@@ -121,7 +50,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
 
       if (_blocks.containsKey(blockId)) {
         // The block exists on the slave already.
-        val originalLevel: StorageLevel = _blocks.get(blockId)
+        val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
 
         if (originalLevel.useMemory) {
           _remainingMem += memSize
@@ -130,7 +59,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
 
       if (storageLevel.isValid) {
         // isValid means it is either stored in-memory or on-disk.
-        _blocks.put(blockId, storageLevel)
+        _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
         if (storageLevel.useMemory) {
           _remainingMem -= memSize
           logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
@@ -143,15 +72,15 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
         }
       } else if (_blocks.containsKey(blockId)) {
         // If isValid is not true, drop the block.
-        val originalLevel: StorageLevel = _blocks.get(blockId)
+        val blockStatus: BlockStatus = _blocks.get(blockId)
         _blocks.remove(blockId)
-        if (originalLevel.useMemory) {
-          _remainingMem += memSize
+        if (blockStatus.storageLevel.useMemory) {
+          _remainingMem += blockStatus.memSize
           logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
             blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
             Utils.memoryBytesToString(_remainingMem)))
         }
-        if (originalLevel.useDisk) {
+        if (blockStatus.storageLevel.useDisk) {
           logInfo("Removed %s on %s:%d on disk (size: %s)".format(
             blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
         }
@@ -162,7 +91,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
 
     def lastSeenMs: Long = _lastSeenMs
 
-    def blocks: JHashMap[String, StorageLevel] = _blocks
+    def blocks: JHashMap[String, BlockStatus] = _blocks
 
     override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
 
@@ -171,8 +100,13 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
     }
   }
 
+  // Mapping from block manager id to the block manager's information.
   private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
+
+  // Mapping from host name to block manager id.
   private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
+
+  // Mapping from block id to the set of block managers that have the block.
   private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
 
   initLogging()
@@ -245,8 +179,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
   }
 
   def receive = {
-    case RegisterBlockManager(blockManagerId, maxMemSize) =>
-      register(blockManagerId, maxMemSize)
+    case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
+      register(blockManagerId, maxMemSize, slaveActor)
 
     case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
       blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
@@ -264,6 +198,9 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
     case GetMemoryStatus =>
       getMemoryStatus
 
+    case RemoveBlock(blockId) =>
+      removeBlock(blockId)
+
     case RemoveHost(host) =>
       removeHost(host)
       sender ! true
@@ -286,6 +223,27 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
       logInfo("Got unknown message: " + other)
   }
 
+  // Remove a block from the slaves that have it. This can only be used to remove
+  // blocks that the master knows about.
+  private def removeBlock(blockId: String) {
+    val block = blockInfo.get(blockId)
+    if (block != null) {
+      block._2.foreach { blockManagerId: BlockManagerId =>
+        val blockManager = blockManagerInfo.get(blockManagerId)
+        if (blockManager.isDefined) {
+          // Remove the block from the slave's BlockManager.
+          // Doesn't actually wait for a confirmation and the message might get lost.
+          // If message loss becomes frequent, we should add retry logic here.
+          blockManager.get.slaveActor ! RemoveBlock(blockId)
+          // Remove the block from the master's BlockManagerInfo.
+          blockManager.get.updateBlockInfo(blockId, StorageLevel.NONE, 0, 0)
+        }
+      }
+      blockInfo.remove(blockId)
+    }
+    sender ! true
+  }
+
   // Return a map from the block manager id to max memory and remaining memory.
   private def getMemoryStatus() {
     val res = blockManagerInfo.map { case(blockManagerId, info) =>
@@ -294,7 +252,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
     sender ! res
   }
 
-  private def register(blockManagerId: BlockManagerId, maxMemSize: Long) {
+  private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
     val startTimeMs = System.currentTimeMillis()
     val tmp = " " + blockManagerId + " "
     logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
@@ -309,7 +267,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
       logInfo("Got Register Msg from master node, don't register it")
     } else {
       blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
-        blockManagerId, System.currentTimeMillis(), maxMemSize))
+        blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
     }
     blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
     logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
@@ -442,25 +400,29 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
   }
 }
 
-private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean)
+
+private[spark] class BlockManagerMaster(
+    val actorSystem: ActorSystem,
+    isMaster: Boolean,
+    isLocal: Boolean,
+    masterIp: String,
+    masterPort: Int)
   extends Logging {
 
-  val AKKA_ACTOR_NAME: String = "BlockMasterManager"
+  val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager"
+  val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
   val REQUEST_RETRY_INTERVAL_MS = 100
-  val DEFAULT_MASTER_IP: String = System.getProperty("spark.master.host", "localhost")
-  val DEFAULT_MASTER_PORT: Int = System.getProperty("spark.master.port", "7077").toInt
   val DEFAULT_MANAGER_IP: String = Utils.localHostName()
 
   val timeout = 10.seconds
   var masterActor: ActorRef = null
 
   if (isMaster) {
-    masterActor = actorSystem.actorOf(
-      Props(new BlockManagerMasterActor(isLocal)), name = AKKA_ACTOR_NAME)
+    masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
+      name = MASTER_AKKA_ACTOR_NAME)
     logInfo("Registered BlockManagerMaster Actor")
   } else {
-    val url = "akka://spark@%s:%s/user/%s".format(
-      DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME)
+    val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
     logInfo("Connecting to BlockManagerMaster: " + url)
     masterActor = actorSystem.actorFor(url)
   }
@@ -497,7 +459,9 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
     logInfo("Removed " + host + " successfully in notifyADeadHost")
   }
 
-  def mustRegisterBlockManager(msg: RegisterBlockManager) {
+  def mustRegisterBlockManager(
+    blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+    val msg = RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)
     logInfo("Trying to register BlockManager")
     while (! syncRegisterBlockManager(msg)) {
       logWarning("Failed to register " + msg)
@@ -506,7 +470,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
     logInfo("Done registering BlockManager")
   }
 
-  def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = {
+  private def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = {
     //val masterActor = RemoteActor.select(node, name)
     val startTimeMs = System.currentTimeMillis()
     val tmp = " msg " + msg + " "
@@ -533,7 +497,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
     return res.get
   }
 
-  def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
+  private def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
     try {
       val answer = askMaster(msg).asInstanceOf[Boolean]
       return Some(answer)
@@ -553,7 +517,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
     return res.get
   }
 
-  def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
+  private def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
     val startTimeMs = System.currentTimeMillis()
     val tmp = " msg " + msg + " "
     logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
@@ -580,7 +544,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
     return res
   }
 
-  def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
+  private def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
     val startTimeMs = System.currentTimeMillis()
     val tmp = " msg " + msg + " "
     logDebug("Got in syncGetLocations 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
@@ -603,7 +567,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
   }
 
   def mustGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
-       Seq[Seq[BlockManagerId]] = {
+      Seq[Seq[BlockManagerId]] = {
     var res: Seq[Seq[BlockManagerId]] = syncGetLocationsMultipleBlockIds(msg)
     while (res == null) {
       logWarning("Failed to GetLocationsMultipleBlockIds " + msg)
@@ -613,7 +577,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
     return res
   }
 
-  def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
+  private def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
       Seq[Seq[BlockManagerId]] = {
     val startTimeMs = System.currentTimeMillis
     val tmp = " msg " + msg + " "
@@ -644,11 +608,10 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
       Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
       res = syncGetPeers(msg)
     }
-
-    return res
+    res
   }
 
-  def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
+  private def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
     val startTimeMs = System.currentTimeMillis
     val tmp = " msg " + msg + " "
     logDebug("Got in syncGetPeers 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
@@ -670,6 +633,20 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
     }
   }
 
+  /**
+   * Remove a block from the slaves that have it. This can only be used to remove
+   * blocks that the master knows about.
+   */
+  def removeBlock(blockId: String) {
+    askMaster(RemoveBlock(blockId))
+  }
+
+  /**
+   * Return the memory status for each block manager, in the form of a map from
+   * the block manager's id to two long values. The first value is the maximum
+   * amount of memory allocated for the block manager, while the second is the
+   * amount of remaining memory.
+   */
   def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
     askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]]
   }
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5bca170f95b84d59bf59ee8587b2cc941930700d
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -0,0 +1,102 @@
+package spark.storage
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+import akka.actor.ActorRef
+
+
+//////////////////////////////////////////////////////////////////////////////////
+// Messages from the master to slaves.
+//////////////////////////////////////////////////////////////////////////////////
+private[spark]
+sealed trait ToBlockManagerSlave
+
+// Remove a block from the slaves that have it. This can only be used to remove
+// blocks that the master knows about.
+private[spark]
+case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
+
+
+//////////////////////////////////////////////////////////////////////////////////
+// Messages from slaves to the master.
+//////////////////////////////////////////////////////////////////////////////////
+private[spark]
+sealed trait ToBlockManagerMaster
+
+private[spark]
+case class RegisterBlockManager(
+    blockManagerId: BlockManagerId,
+    maxMemSize: Long,
+    sender: ActorRef)
+  extends ToBlockManagerMaster
+
+private[spark]
+case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+private[spark]
+class BlockUpdate(
+    var blockManagerId: BlockManagerId,
+    var blockId: String,
+    var storageLevel: StorageLevel,
+    var memSize: Long,
+    var diskSize: Long)
+  extends ToBlockManagerMaster
+  with Externalizable {
+
+  def this() = this(null, null, null, 0, 0)  // For deserialization only
+
+  override def writeExternal(out: ObjectOutput) {
+    blockManagerId.writeExternal(out)
+    out.writeUTF(blockId)
+    storageLevel.writeExternal(out)
+    out.writeInt(memSize.toInt)
+    out.writeInt(diskSize.toInt)
+  }
+
+  override def readExternal(in: ObjectInput) {
+    blockManagerId = new BlockManagerId()
+    blockManagerId.readExternal(in)
+    blockId = in.readUTF()
+    storageLevel = new StorageLevel()
+    storageLevel.readExternal(in)
+    memSize = in.readInt()
+    diskSize = in.readInt()
+  }
+}
+
+private[spark]
+object BlockUpdate {
+  def apply(blockManagerId: BlockManagerId,
+      blockId: String,
+      storageLevel: StorageLevel,
+      memSize: Long,
+      diskSize: Long): BlockUpdate = {
+    new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
+  }
+
+  // For pattern-matching
+  def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+    Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
+  }
+}
+
+private[spark]
+case class GetLocations(blockId: String) extends ToBlockManagerMaster
+
+private[spark]
+case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
+
+private[spark]
+case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+
+private[spark]
+case class RemoveHost(host: String) extends ToBlockManagerMaster
+
+private[spark]
+case object StopBlockManagerMaster extends ToBlockManagerMaster
+
+private[spark]
+case object GetMemoryStatus extends ToBlockManagerMaster
+
+private[spark]
+case object ExpireDeadHosts extends ToBlockManagerMaster
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f570cdc52dd1b2347b8cca8eab62af80b318815e
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
@@ -0,0 +1,16 @@
+package spark.storage
+
+import akka.actor.Actor
+
+import spark.{Logging, SparkException, Utils}
+
+
+/**
+ * An actor to take commands from the master to execute options. For example,
+ * this is used to remove blocks from the slave's BlockManager.
+ */
+class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
+  override def receive = {
+    case RemoveBlock(blockId) => blockManager.removeBlock(blockId)
+  }
+}
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index 5bb5a29cc42243a95c160f2eb62a65a8ba1e2792..689f07b9692fbf5c68b78080f85222d86fbe8582 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -58,8 +58,10 @@ private[spark] object ThreadingTest {
         val startTime = System.currentTimeMillis()
         manager.get(blockId) match {
           case Some(retrievedBlock) =>
-            assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match")
-            println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+            assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
+              "Block " + blockId + " did not match")
+            println("Got block " + blockId + " in " +
+              (System.currentTimeMillis - startTime) + " ms")
           case None =>
             assert(false, "Block " + blockId + " could not be retrieved")
         }
@@ -73,7 +75,9 @@ private[spark] object ThreadingTest {
     System.setProperty("spark.kryoserializer.buffer.mb", "1")
     val actorSystem = ActorSystem("test")
     val serializer = new KryoSerializer
-    val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
+    val masterIp: String = System.getProperty("spark.master.host", "localhost")
+    val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
+    val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, masterIp, masterPort)
     val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024)
     val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
     val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
@@ -86,6 +90,7 @@ private[spark] object ThreadingTest {
     actorSystem.shutdown()
     actorSystem.awaitTermination()
     println("Everything stopped.")
-    println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
+    println(
+      "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
   }
 }
diff --git a/core/src/main/scala/spark/util/GenerationIdUtil.scala b/core/src/main/scala/spark/util/GenerationIdUtil.scala
new file mode 100644
index 0000000000000000000000000000000000000000..8a17b700b04d148a9496a467c9c22d3a7e9c39c7
--- /dev/null
+++ b/core/src/main/scala/spark/util/GenerationIdUtil.scala
@@ -0,0 +1,19 @@
+package spark.util
+
+import java.util.concurrent.atomic.AtomicInteger
+
+private[spark]
+object GenerationIdUtil {
+
+  val BLOCK_MANAGER = new IdGenerator
+
+  /**
+   * A util used to get a unique generation ID. This is a wrapper around
+   * Java's AtomicInteger.
+   */
+  class IdGenerator {
+    private var id = new AtomicInteger
+
+    def next: Int = id.incrementAndGet
+  }
+}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index ad2253596df86b4330be66814e732a4332b30734..4dc3b7ec0578fe3e53f30b06fb905030739b0f66 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -20,15 +20,15 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
   var oldArch: String = null
   var oldOops: String = null
   var oldHeartBeat: String = null
-  
-  // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test 
+
+  // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
   val serializer = new KryoSerializer
 
   before {
     actorSystem = ActorSystem("test")
-    master = new BlockManagerMaster(actorSystem, true, true)
+    master = new BlockManagerMaster(actorSystem, true, true, "localhost", 7077)
 
-    // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case 
+    // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
     oldArch = System.setProperty("os.arch", "amd64")
     oldOops = System.setProperty("spark.test.useCompressedOops", "true")
     oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
@@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
     store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
 
-    // Checking whether blocks are in memory 
+    // Checking whether blocks are in memory
     assert(store.getSingle("a1") != None, "a1 was not in store")
     assert(store.getSingle("a2") != None, "a2 was not in store")
     assert(store.getSingle("a3") != None, "a3 was not in store")
@@ -83,7 +83,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
     assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
     assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
-    
+
     // Drop a1 and a2 from memory; this should be reported back to the master
     store.dropFromMemory("a1", null)
     store.dropFromMemory("a2", null)
@@ -93,6 +93,45 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
   }
 
+  test("removing block") {
+    store = new BlockManager(actorSystem, master, serializer, 2000)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+
+    // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
+
+    // Checking whether blocks are in memory and memory size
+    var memStatus = master.getMemoryStatus.head._2
+    assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
+    assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
+    assert(store.getSingle("a1") != None, "a1 was not in store")
+    assert(store.getSingle("a2") != None, "a2 was not in store")
+    assert(store.getSingle("a3") != None, "a3 was not in store")
+
+    // Checking whether master knows about the blocks or not
+    assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+    assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
+    assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
+
+    // Remove a1 and a2 and a3. Should be no-op for a3.
+    master.removeBlock("a1")
+    master.removeBlock("a2")
+    master.removeBlock("a3")
+    assert(store.getSingle("a1") === None, "a1 not removed from store")
+    assert(store.getSingle("a2") === None, "a2 not removed from store")
+    assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
+    assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
+    assert(store.getSingle("a3") != None, "a3 was not in store")
+    assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
+    memStatus = master.getMemoryStatus.head._2
+    assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
+    assert(memStatus._2 == 2000L, "remaining memory " + memStatus._1 + " should equal 2000")
+  }
+
   test("reregistration on heart beat") {
     val heartBeat = PrivateMethod[Unit]('heartBeat)
     store = new BlockManager(actorSystem, master, serializer, 2000)
@@ -122,7 +161,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
 
     master.notifyADeadHost(store.blockManagerId.ip)
     assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
-    
+
     store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
 
     assert(master.mustGetLocations(GetLocations("a1")).size > 0,
@@ -145,11 +184,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
 
     store invokePrivate heartBeat()
-    
+
     assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
 
     store2 invokePrivate heartBeat()
-    
+
     assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master")
   }
 
@@ -171,7 +210,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(store.getSingle("a2") != None, "a2 was not in store")
     assert(store.getSingle("a3") === None, "a3 was in store")
   }
-  
+
   test("in-memory LRU storage with serialization") {
     store = new BlockManager(actorSystem, master, serializer, 1200)
     val a1 = new Array[Byte](400)