diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 3b9cd8b5fc1bc948de63031176b3f8c1d19b107a..37d58625754fe6ecd1ff641e1662a38fb44910fa 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -59,31 +59,15 @@ class BlockLocker(numLockers: Int) {
 class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
   extends Logging {
 
-  class BlockInfo(val level: StorageLevel, val tellMaster: Boolean, var pending: Boolean = true) {
-    def waitForReady() {
-      if (pending) {
-        synchronized {
-          while (pending) this.wait()
-        }
-      }
-    }
-
-    def markReady() {
-      pending = false
-      synchronized {
-        this.notifyAll()
-      }
-    }
-  }
+  case class BlockInfo(level: StorageLevel, tellMaster: Boolean)
 
   private val NUM_LOCKS = 337
   private val locker = new BlockLocker(NUM_LOCKS)
 
   private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
-
   private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
-  private[storage] val diskStore: BlockStore =
-    new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
+  private[storage] val diskStore: BlockStore = new DiskStore(this,
+    System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
 
   val connectionManager = new ConnectionManager(0)
   implicit val futureExecContext = connectionManager.futureExecContext
@@ -95,6 +79,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
   var cacheTracker: CacheTracker = null
 
   val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties
+
   val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean
 
   initialize()
@@ -125,32 +110,45 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
   }
 
   /**
-   * Tell the master about the current storage status of a block. This will send a heartbeat
-   * message reflecting the current status, *not* the desired storage level in its block info.
-   * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
+   * Change the storage level for a local block in the block info meta data, and
+   * tell the master if necessary. Note that this is only a meta data change and
+   * does NOT actually change the storage of the block. If the new level is
+   * invalid, then block info (if exists) will be silently removed.
    */
-  def reportBlockStatus(blockId: String) {
-    locker.getLock(blockId).synchronized {
-      val curLevel = blockInfo.get(blockId) match {
-        case null =>
-          StorageLevel.NONE
-        case info =>
-          info.level match {
-            case null =>
-              StorageLevel.NONE
-            case level =>
-              val inMem = level.useMemory && memoryStore.contains(blockId)
-              val onDisk = level.useDisk && diskStore.contains(blockId)
-              new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
-          }
-      }
+  private[spark] def setLevelAndTellMaster(
+    blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
+
+    if (level == null) {
+      throw new IllegalArgumentException("Storage level is null")
+    }
+
+    // If there was earlier info about the block, then use earlier tellMaster
+    val oldInfo = blockInfo.get(blockId)
+    val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster
+    if (oldInfo != null && oldInfo.tellMaster != tellMaster) {
+      logWarning("Ignoring tellMaster setting as it is different from earlier setting")
+    }
+
+    // If level is valid, store the block info, else remove the block info
+    if (level.isValid) {
+      blockInfo.put(blockId, new BlockInfo(level, newTellMaster))
+      logDebug("Info for block " + blockId + " updated with new level as " + level)
+    } else {
+      blockInfo.remove(blockId)
+      logDebug("Info for block " + blockId + " removed as new level is null or invalid")
+    }
+
+    // Tell master if necessary
+    if (newTellMaster) {
       master.mustHeartBeat(HeartBeat(
         blockManagerId,
         blockId,
-        curLevel,
-        if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L,
-        if (curLevel.useDisk) diskStore.getSize(blockId) else 0L))
+        level,
+        if (level.isValid && level.useMemory) memoryStore.getSize(blockId) else 0,
+        if (level.isValid && level.useDisk) diskStore.getSize(blockId) else 0))
       logDebug("Told master about block " + blockId)
+    } else {
+      logDebug("Did not tell master about block " + blockId)
     }
   }
 
@@ -182,21 +180,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
   def getLocal(blockId: String): Option[Iterator[Any]] = {
     logDebug("Getting local block " + blockId)
     locker.getLock(blockId).synchronized {
-      val info = blockInfo.get(blockId)
-      if (info != null) {
-        info.waitForReady() // In case the block is still being put() by another thread
-        val level = info.level
-        logDebug("Level for block " + blockId + " is " + level)
+      // Check storage level of block
+      val level = getLevel(blockId)
+      if (level != null) {
+        logDebug("Level for block " + blockId + " is " + level + " on local machine")
 
         // Look for the block in memory
         if (level.useMemory) {
           logDebug("Getting block " + blockId + " from memory")
           memoryStore.getValues(blockId) match {
-            case Some(iterator) =>
+            case Some(iterator) => {
               logDebug("Block " + blockId + " found in memory")
               return Some(iterator)
-            case None =>
+            }
+            case None => {
               logDebug("Block " + blockId + " not found in memory")
+            }
           }
         }
 
@@ -204,12 +203,14 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
         if (level.useDisk) {
           logDebug("Getting block " + blockId + " from disk")
           diskStore.getValues(blockId) match {
-            case Some(iterator) =>
+            case Some(iterator) => {
               logDebug("Block " + blockId + " found in disk")
               return Some(iterator)
-            case None =>
+            }
+            case None => {
               throw new Exception("Block " + blockId + " not found on disk, though it should be")
               return None
+            }
           }
         }
       } else {
@@ -225,21 +226,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
   def getLocalBytes(blockId: String): Option[ByteBuffer] = {
     logDebug("Getting local block " + blockId + " as bytes")
     locker.getLock(blockId).synchronized {
-      val info = blockInfo.get(blockId)
-      if (info != null) {
-        info.waitForReady() // In case the block is still being put() by another thread
-        val level = info.level
+      // Check storage level of block
+      val level = getLevel(blockId)
+      if (level != null) {
         logDebug("Level for block " + blockId + " is " + level + " on local machine")
 
         // Look for the block in memory
         if (level.useMemory) {
           logDebug("Getting block " + blockId + " from memory")
           memoryStore.getBytes(blockId) match {
-            case Some(bytes) =>
+            case Some(bytes) => {
               logDebug("Block " + blockId + " found in memory")
               return Some(bytes)
-            case None =>
+            }
+            case None => {
               logDebug("Block " + blockId + " not found in memory")
+            }
           }
         }
 
@@ -247,12 +249,14 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
         if (level.useDisk) {
           logDebug("Getting block " + blockId + " from disk")
           diskStore.getBytes(blockId) match {
-            case Some(bytes) =>
+            case Some(bytes) => {
               logDebug("Block " + blockId + " found in disk")
               return Some(bytes)
-            case None =>
+            }
+            case None => {
               throw new Exception("Block " + blockId + " not found on disk, though it should be")
               return None
+            }
           }
         }
       } else {
@@ -427,17 +431,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
       throw new IllegalArgumentException("Storage level is null or invalid")
     }
 
-    if (blockInfo.containsKey(blockId)) {
-      logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
-      return
-    }
-
-    // Remember the block's storage level so that we can correctly drop it to disk if it needs
-    // to be dropped right after it got put into memory. Note, however, that other threads will
-    // not be able to get() this block until we call markReady on its BlockInfo.
-    val myInfo = new BlockInfo(level, tellMaster)
-    blockInfo.put(blockId, myInfo)
-
     val startTimeMs = System.currentTimeMillis
     var bytes: ByteBuffer = null
 
@@ -451,15 +444,32 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
       logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
         + " to get into synchronized block")
 
-      if (level.useMemory) {
-        // Save it just to memory first, even if it also has useDisk set to true; we will later
-        // drop it to disk if the memory store can't hold it.
+      // Check and warn if block with same id already exists
+      if (getLevel(blockId) != null) {
+        logWarning("Block " + blockId + " already exists in local machine")
+        return
+      }
+
+      if (level.useMemory && level.useDisk) {
+        // If saving to both memory and disk, then serialize only once
+        memoryStore.putValues(blockId, values, level, true) match {
+          case Left(newValues) =>
+            diskStore.putValues(blockId, newValues, level, true) match {
+              case Right(newBytes) => bytes = newBytes
+              case _ => throw new Exception("Unexpected return value")
+            }
+          case Right(newBytes) =>
+            bytes = newBytes
+            diskStore.putBytes(blockId, newBytes, level)
+        }
+      } else if (level.useMemory) {
+        // If only save to memory
         memoryStore.putValues(blockId, values, level, true) match {
           case Right(newBytes) => bytes = newBytes
           case Left(newIterator) => valuesAfterPut = newIterator
         }
       } else {
-        // Save directly to disk.
+        // If only save to disk
         val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them
         diskStore.putValues(blockId, values, level, askForBytes) match {
           case Right(newBytes) => bytes = newBytes
@@ -467,12 +477,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
         }
       }
 
-      // Now that the block is in either the memory or disk store, let other threads read it,
-      // and tell the master about it.
-      myInfo.markReady()
-      if (tellMaster) {
-        reportBlockStatus(blockId)
-      }
+      // Store the storage level
+      setLevelAndTellMaster(blockId, level, tellMaster)
     }
     logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
 
@@ -515,17 +521,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
       throw new IllegalArgumentException("Storage level is null or invalid")
     }
 
-    if (blockInfo.containsKey(blockId)) {
-      logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
-      return
-    }
-
-    // Remember the block's storage level so that we can correctly drop it to disk if it needs
-    // to be dropped right after it got put into memory. Note, however, that other threads will
-    // not be able to get() this block until we call markReady on its BlockInfo.
-    val myInfo = new BlockInfo(level, tellMaster)
-    blockInfo.put(blockId, myInfo)
-
     val startTimeMs = System.currentTimeMillis
 
     // Initiate the replication before storing it locally. This is faster as
@@ -542,22 +537,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
     locker.getLock(blockId).synchronized {
       logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
         + " to get into synchronized block")
+      if (getLevel(blockId) != null) {
+        logWarning("Block " + blockId + " already exists")
+        return
+      }
 
       if (level.useMemory) {
-        // Store it only in memory at first, even if useDisk is also set to true
         bytes.rewind()
         memoryStore.putBytes(blockId, bytes, level)
-      } else {
+      }
+      if (level.useDisk) {
         bytes.rewind()
         diskStore.putBytes(blockId, bytes, level)
       }
 
-      // Now that the block is in either the memory or disk store, let other threads read it,
-      // and tell the master about it.
-      myInfo.markReady()
-      if (tellMaster) {
-        reportBlockStatus(blockId)
-      }
+      // Store the storage level
+      setLevelAndTellMaster(blockId, level, tellMaster)
     }
 
     // TODO: This code will be removed when CacheTracker is gone.
@@ -631,31 +626,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
   }
 
   /**
-   * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
-   * store reaches its limit and needs to free up space.
+   * Drop block from memory (called when memory store has reached it limit)
    */
-  def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) {
-    logInfo("Dropping block " + blockId + " from memory")
+  def dropFromMemory(blockId: String) {
     locker.getLock(blockId).synchronized {
-      val info = blockInfo.get(blockId)
-      val level = info.level
-      if (level.useDisk && !diskStore.contains(blockId)) {
-        logInfo("Writing block " + blockId + " to disk")
-        data match {
-          case Left(iterator) =>
-            diskStore.putValues(blockId, iterator, level, false)
-          case Right(bytes) =>
-            diskStore.putBytes(blockId, bytes, level)
-        }
+      val level = getLevel(blockId)
+      if (level == null) {
+        logWarning("Block " + blockId + " cannot be removed from memory as it does not exist")
+        return
       }
-      memoryStore.remove(blockId)
-      if (info.tellMaster) {
-        reportBlockStatus(blockId)
-      }
-      if (!level.useDisk) {
-        // The block is completely gone from this node; forget it so we can put() it again later.
-        blockInfo.remove(blockId)
+      if (!level.useMemory) {
+        logWarning("Block " + blockId + " cannot be removed from memory as it is not in memory")
+        return
       }
+      memoryStore.remove(blockId)
+      val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication)
+      setLevelAndTellMaster(blockId, newLevel)
     }
   }
 
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index ff482ff66bdcde066cbe1eb9a8135c8353527fe8..5f123aca7872c1f7729627a21335c94bceaf8e00 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -31,7 +31,5 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
 
   def remove(blockId: String)
 
-  def contains(blockId: String): Boolean
-
   def clear() { }
 }
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index d0c592ccb128dc7ae071d830f5d7c658be903e0c..d9965f4306e61415b6dacae97920623270a59769 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -26,7 +26,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
   addShutdownHook()
 
   override def getSize(blockId: String): Long = {
-    getFile(blockId).length()
+    getFile(blockId).length
   }
 
   override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
@@ -93,10 +93,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
     }
   }
 
-  override def contains(blockId: String): Boolean = {
-    getFile(blockId).exists()
-  }
-
   private def createFile(blockId: String): File = {
     val file = getFile(blockId)
     if (file.exists()) {
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 74ef326038fc8c5cf71dd400155fbffbbcf67090..ea6f3c4fccaecc10352eff4abccf05cc3e05a6be 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -18,12 +18,29 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
   private var currentMemory = 0L
 
+  //private val blockDropper = Executors.newSingleThreadExecutor()
+  private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
+  private val blockDropper = new Thread("memory store - block dropper") {
+    override def run() {
+      try {
+        while (true) {
+          val blockId = blocksToDrop.take()
+          logDebug("Block " + blockId + " ready to be dropped")
+          blockManager.dropFromMemory(blockId)
+        }
+      } catch {
+        case ie: InterruptedException =>
+          logInfo("Shutting down block dropper")
+      }
+    }
+  }
+  blockDropper.start()
   logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
 
   def freeMemory: Long = maxMemory - currentMemory
 
   override def getSize(blockId: String): Long = {
-    synchronized {
+    entries.synchronized {
       entries.get(blockId).size
     }
   }
@@ -35,12 +52,19 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       val elements = new ArrayBuffer[Any]
       elements ++= values
       val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
-      tryToPut(blockId, elements, sizeEstimate, true)
+      ensureFreeSpace(sizeEstimate)
+      val entry = new Entry(elements, sizeEstimate, true)
+      entries.synchronized { entries.put(blockId, entry) }
+      currentMemory += sizeEstimate
+      logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
+        blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
     } else {
       val entry = new Entry(bytes, bytes.limit, false)
-      ensureFreeSpace(blockId, bytes.limit)
-      synchronized { entries.put(blockId, entry) }
-      tryToPut(blockId, bytes, bytes.limit, false)
+      ensureFreeSpace(bytes.limit)
+      entries.synchronized { entries.put(blockId, entry) }
+      currentMemory += bytes.limit
+      logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
+        blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
     }
   }
 
@@ -55,17 +79,27 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       val elements = new ArrayBuffer[Any]
       elements ++= values
       val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
-      tryToPut(blockId, elements, sizeEstimate, true)
+      ensureFreeSpace(sizeEstimate)
+      val entry = new Entry(elements, sizeEstimate, true)
+      entries.synchronized { entries.put(blockId, entry) }
+      currentMemory += sizeEstimate
+      logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
+        blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
       Left(elements.iterator)
     } else {
       val bytes = blockManager.dataSerialize(values)
-      tryToPut(blockId, bytes, bytes.limit, false)
+      ensureFreeSpace(bytes.limit)
+      val entry = new Entry(bytes, bytes.limit, false)
+      entries.synchronized { entries.put(blockId, entry) }
+      currentMemory += bytes.limit
+      logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
+        blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
       Right(bytes)
     }
   }
 
   override def getBytes(blockId: String): Option[ByteBuffer] = {
-    val entry = synchronized {
+    val entry = entries.synchronized {
       entries.get(blockId)
     }
     if (entry == null) {
@@ -78,7 +112,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   }
 
   override def getValues(blockId: String): Option[Iterator[Any]] = {
-    val entry = synchronized {
+    val entry = entries.synchronized {
       entries.get(blockId)
     }
     if (entry == null) {
@@ -92,7 +126,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   }
 
   override def remove(blockId: String) {
-    synchronized {
+    entries.synchronized {
       val entry = entries.get(blockId)
       if (entry != null) {
         entries.remove(blockId)
@@ -100,118 +134,54 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         logInfo("Block %s of size %d dropped from memory (free %d)".format(
           blockId, entry.size, freeMemory))
       } else {
-        logWarning("Block " + blockId + " could not be removed as it does not exist")
+        logWarning("Block " + blockId + " could not be removed as it doesnt exist")
       }
     }
   }
 
   override def clear() {
-    synchronized {
+    entries.synchronized {
       entries.clear()
     }
+    blockDropper.interrupt()
     logInfo("MemoryStore cleared")
   }
 
-  /**
-   * Return the RDD ID that a given block ID is from, or null if it is not an RDD block.
-   */
-  private def getRddId(blockId: String): String = {
-    if (blockId.startsWith("rdd_")) {
-      blockId.split('_')(1)
-    } else {
-      null
-    }
-  }
-
-  /**
-   * Try to put in a set of values, if we can free up enough space. The value should either be
-   * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
-   * size must also be passed by the caller.
-   */
-  private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
-    synchronized {
-      if (ensureFreeSpace(blockId, size)) {
-        val entry = new Entry(value, size, deserialized)
-        entries.put(blockId, entry)
-        currentMemory += size
-        if (deserialized) {
-          logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
-            blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
-        } else {
-          logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
-            blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
-        }
-        true
-      } else {
-        // Tell the block manager that we couldn't put it in memory so that it can drop it to
-        // disk if the block allows disk storage.
-        val data = if (deserialized) {
-          Left(value.asInstanceOf[ArrayBuffer[Any]].iterator)
-        } else {
-          Right(value.asInstanceOf[ByteBuffer].duplicate())
-        }
-        blockManager.dropFromMemory(blockId, data)
-        false
-      }
-    }
-  }
-
-  /**
-   * Tries to free up a given amount of space to store a particular block, but can fail and return
-   * false if either the block is bigger than our memory or it would require replacing another
-   * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
-   * don't fit into memory that we want to avoid).
-   *
-   * Assumes that a lock on entries is held by the caller.
-   */
-  private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
+  // TODO: This should be able to return false if the space is larger than our total memory,
+  // or if adding this block would require evicting another one from the same RDD
+  private def ensureFreeSpace(space: Long) {
     logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
       space, currentMemory, maxMemory))
 
-    if (space > maxMemory) {
-      logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
-      return false
-    }
-    
     if (maxMemory - currentMemory < space) {
-      val rddToAdd = getRddId(blockIdToAdd)
+
       val selectedBlocks = new ArrayBuffer[String]()
       var selectedMemory = 0L
 
-      val iterator = entries.entrySet().iterator()
-      while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
-        val pair = iterator.next()
-        val blockId = pair.getKey
-        if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
-          logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
-            "block from the same RDD")
-          return false
+      entries.synchronized {
+        val iter = entries.entrySet().iterator()
+        while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
+          val pair = iter.next()
+          val blockId = pair.getKey
+          val entry = pair.getValue
+          if (!entry.dropPending) {
+            selectedBlocks += blockId
+            entry.dropPending = true
+          }
+          selectedMemory += pair.getValue.size
+          logInfo("Block " + blockId + " selected for dropping")
         }
-        selectedBlocks += blockId
-        selectedMemory += pair.getValue.size
       }
 
-      if (maxMemory - (currentMemory - selectedMemory) >= space) {
-        logInfo(selectedBlocks.size + " blocks selected for dropping")
-        for (blockId <- selectedBlocks) {
-          val entry = entries.get(blockId)
-          val data = if (entry.deserialized) {
-            Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
-          } else {
-            Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
-          }
-          blockManager.dropFromMemory(blockId, data)
-        }
-        return true
-      } else {
-        return false
+      logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " +
+        blocksToDrop.size + " blocks pending")
+      var i = 0
+      while (i < selectedBlocks.size) {
+        blocksToDrop.add(selectedBlocks(i))
+        i += 1
       }
+      selectedBlocks.clear()
     }
-    return true
-  }
-
-  override def contains(blockId: String): Boolean = {
-    synchronized { entries.containsKey(blockId) }
   }
 }
 
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 8f4b9d395f8e2f583a721576dd73b300eac278fe..d15d7285a7fa620b1376d21fc4ca1d0091e192f9 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -69,9 +69,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
     assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
     
-    // Drop a1 and a2 from memory; this should be reported back to the master
-    store.dropFromMemory("a1", null)
-    store.dropFromMemory("a2", null)
+    // Setting storage level of a1 and a2 to invalid; they should be removed from store and master
+    store.setLevelAndTellMaster("a1", new StorageLevel(false, false, false, 1))
+    store.setLevelAndTellMaster("a2", new StorageLevel(true, false, false, 0))
     assert(store.getSingle("a1") === None, "a1 not removed from store")
     assert(store.getSingle("a2") === None, "a2 not removed from store")
     assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")