diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 8672a5376ebd057eff95e9e5c5748c429da39296..ce6faced341d947096ac9ef8e24260d128e78231 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,16 +1,15 @@
 package spark.storage
 
 import spark.{Utils, Logging, Serializer, SizeEstimator}
-
 import scala.collection.mutable.ArrayBuffer
-
 import java.io.{File, RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
 import java.util.{UUID, LinkedHashMap}
 import java.util.concurrent.Executors
-
+import java.util.concurrent.ConcurrentHashMap
 import it.unimi.dsi.fastutil.io._
+import java.util.concurrent.ArrayBlockingQueue
 
 /**
  * Abstract class to store blocks
@@ -41,13 +40,28 @@ abstract class BlockStore(blockManager: BlockManager) extends Logging {
 class MemoryStore(blockManager: BlockManager, maxMemory: Long) 
   extends BlockStore(blockManager) {
 
-  class Entry(var value: Any, val size: Long, val deserialized: Boolean)
+  case class Entry(var value: Any, size: Long, deserialized: Boolean, dropPending: Boolean = false)
   
   private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true)
   private var currentMemory = 0L
  
-  private val blockDropper = Executors.newSingleThreadExecutor() 
-
+  //private val blockDropper = Executors.newSingleThreadExecutor() 
+  private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
+  private val blockDropper = new Thread("memory store - block dropper") {
+    override def run() {
+      try{
+        while (true) {
+          val blockId = blocksToDrop.take()
+          blockManager.dropFromMemory(blockId)
+        }
+      } catch {
+        case ie: InterruptedException => 
+          logInfo("Shutting down block dropper")
+      }
+    }
+  }
+  blockDropper.start()
+  
   def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
     if (level.deserialized) {
       bytes.rewind()
@@ -124,41 +138,52 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     memoryStore.synchronized {
       memoryStore.clear()
     }
-    blockDropper.shutdown()
+    //blockDropper.shutdown()
+    blockDropper.interrupt()
     logInfo("MemoryStore cleared")
   }
 
   private def drop(blockId: String) {
-    blockDropper.submit(new Runnable() {
+    /*blockDropper.submit(new Runnable() {
       def run() {
         blockManager.dropFromMemory(blockId)
       }
     })
+    */
   }
 
   private def ensureFreeSpace(space: Long) {
     logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
       space, currentMemory, maxMemory))
     
-    val droppedBlockIds = new ArrayBuffer[String]()
-    var droppedMemory = 0L
-    
-    memoryStore.synchronized {
-      val iter = memoryStore.entrySet().iterator()
-      while (maxMemory - (currentMemory - droppedMemory) < space && iter.hasNext) {
-        val pair = iter.next()
-        val blockId = pair.getKey
-        droppedBlockIds += blockId
-        droppedMemory += pair.getValue.size
-        logDebug("Decided to drop " + blockId)
+    if (maxMemory - currentMemory < space) {
+
+      val selectedBlocks = new ArrayBuffer[String]()
+      var selectedMemory = 0L
+      
+      memoryStore.synchronized {
+        val iter = memoryStore.entrySet().iterator()
+        while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
+          val pair = iter.next()
+          val blockId = pair.getKey
+          val entry = pair.getValue()
+          if (!entry.dropPending) {
+            selectedBlocks += blockId
+          }
+          selectedMemory += pair.getValue.size
+          logDebug("Block" + blockId + " selected for dropping")
+        }
+      }  
+      
+      logDebug("" + selectedBlocks.size + " selected for dropping")
+      var i = 0 
+      while (i < selectedBlocks.size) {
+        blocksToDrop.add(selectedBlocks(i))
+        i += 1
       }
-    }  
-    
-    for (blockId <- droppedBlockIds) {
-      drop(blockId)
+      selectedBlocks.clear()      
     }
-    droppedBlockIds.clear()
-  }
+  }      
 }
 
 
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 14ff5f8e3d53cdd6dcc79a1ec6a36273dce25da2..64d137c6c8ac41b7320439b83e6cdcd415358856 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -204,6 +204,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter{
     assert(store.get("list3").get.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
     store.put("list4", list4.iterator, StorageLevel.DISK_AND_MEMORY)
+    Thread.sleep(100)
     assert(store.get("list1") === None, "list1 was in store")
     assert(store.get("list2") != None, "list3 was not in store")
     assert(store.get("list2").get.size === 2)