diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index d9cbe3730a8a9c27874dc135d932031afd806e6d..c5db6ce63a43807bb1755313407022458ff4e384 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -208,23 +208,19 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
         // TODO: fetch any remote copy of the split that may be available
         // TODO: also register a listener for when it unloads
         logInfo("Computing partition " + split)
+        val elements = new ArrayBuffer[Any]
+        elements ++= rdd.compute(split)
         try {
-          // BlockManager will iterate over results from compute to create RDD
-          blockManager.put(key, rdd.compute(split), storageLevel, true)
+          // Try to put this block in the blockManager
+          blockManager.put(key, elements, storageLevel, true)
           //future.apply() // Wait for the reply from the cache tracker
-          blockManager.get(key) match {
-            case Some(values) => 
-              return values.asInstanceOf[Iterator[T]]
-            case None =>
-              logWarning("loading partition failed after computing it " + key) 
-              return null
-          }
         } finally {
           loading.synchronized {
             loading.remove(key)
             loading.notifyAll()
           }
         }
+        return elements.iterator.asInstanceOf[Iterator[T]]
     }
   }
 
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 91b7bebfb30fce4bb6d0e5e6564138bf268c9525..8a111f44c9f435b8267cb5f542431de0b3939f54 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -237,7 +237,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
             diskStore.getValues(blockId) match {
               case Some(iterator) =>
                 // Put the block back in memory before returning it
-                memoryStore.putValues(blockId, iterator, level, true).data match {
+                // TODO: Consider creating a putValues that also takes in a iterator ?
+                val elements = new ArrayBuffer[Any]
+                elements ++= iterator
+                memoryStore.putValues(blockId, elements, level, true).data match {
                   case Left(iterator2) =>
                     return Some(iterator2)
                   case _ =>
@@ -529,11 +532,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
     }
   }
 
+  def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
+    : Long = {
+    val elements = new ArrayBuffer[Any]
+    elements ++= values
+    put(blockId, elements, level, tellMaster)
+  }
+
   /**
    * Put a new block of values to the block manager. Returns its (estimated) size in bytes.
    */
-  def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true)
-    : Long = {
+  def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel,
+    tellMaster: Boolean = true) : Long = {
 
     if (blockId == null) {
       throw new IllegalArgumentException("Block Id is null")
@@ -766,7 +776,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
    * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
    * store reaches its limit and needs to free up space.
    */
-  def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) {
+  def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
     logInfo("Dropping block " + blockId + " from memory")
     locker.getLock(blockId).synchronized {
       val info = blockInfo.get(blockId)
@@ -774,8 +784,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
       if (level.useDisk && !diskStore.contains(blockId)) {
         logInfo("Writing block " + blockId + " to disk")
         data match {
-          case Left(iterator) =>
-            diskStore.putValues(blockId, iterator, level, false)
+          case Left(elements) =>
+            diskStore.putValues(blockId, elements, level, false)
           case Right(bytes) =>
             diskStore.putBytes(blockId, bytes, level)
         }
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 1286600cd1aaf13fb5be8d346794bdb016e65036..096bf8bdd967c05f5a9e55de844a2084b8a681be 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,6 +1,7 @@
 package spark.storage
 
 import java.nio.ByteBuffer
+import scala.collection.mutable.ArrayBuffer
 
 import spark.Logging
 
@@ -18,8 +19,8 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
    * @return a PutResult that contains the size of the data, as well as the values put if
    *         returnValues is true (if not, the result's data field can be null)
    */
-  def putValues(blockId: String, values: Iterator[Any], level: StorageLevel, returnValues: Boolean)
-    : PutResult
+  def putValues(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, 
+    returnValues: Boolean) : PutResult
 
   /**
    * Return the size of a block in bytes.
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index fd92a3dc67a824eed1f242fc1d65ef1d89aefb18..8ba64e4b76a0e142307932da74b0f058833d0ac1 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -3,11 +3,15 @@ package spark.storage
 import java.nio.ByteBuffer
 import java.io.{File, FileOutputStream, RandomAccessFile}
 import java.nio.channels.FileChannel.MapMode
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
 import java.util.{Random, Date}
-import spark.Utils
 import java.text.SimpleDateFormat
 
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+
+import spark.Utils
+
 /**
  * Stores BlockManager blocks on disk.
  */
@@ -45,7 +49,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
 
   override def putValues(
       blockId: String,
-      values: Iterator[Any],
+      values: ArrayBuffer[Any],
       level: StorageLevel,
       returnValues: Boolean)
     : PutResult = {
@@ -56,7 +60,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
     val fileOut = blockManager.wrapForCompression(blockId,
       new FastBufferedOutputStream(new FileOutputStream(file)))
     val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
-    objOut.writeAll(values)
+    objOut.writeAll(values.iterator)
     objOut.close()
     val length = file.length()
     logDebug("Block %s stored as %s file on disk in %d ms".format(
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index e9288fdf435e53c5c2915aa40f9ee3c0df3696fb..773970446a0eecdb6c5dccecf6766507447050a4 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -46,19 +46,17 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
 
   override def putValues(
       blockId: String,
-      values: Iterator[Any],
+      values: ArrayBuffer[Any],
       level: StorageLevel,
       returnValues: Boolean)
     : PutResult = {
 
     if (level.deserialized) {
-      val elements = new ArrayBuffer[Any]
-      elements ++= values
-      val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
-      tryToPut(blockId, elements, sizeEstimate, true)
-      PutResult(sizeEstimate, Left(elements.iterator))
+      val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
+      tryToPut(blockId, values, sizeEstimate, true)
+      PutResult(sizeEstimate, Left(values.iterator))
     } else {
-      val bytes = blockManager.dataSerialize(blockId, values)
+      val bytes = blockManager.dataSerialize(blockId, values.iterator)
       tryToPut(blockId, bytes, bytes.limit, false)
       PutResult(bytes.limit(), Right(bytes))
     }
@@ -146,7 +144,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         // Tell the block manager that we couldn't put it in memory so that it can drop it to
         // disk if the block allows disk storage.
         val data = if (deserialized) {
-          Left(value.asInstanceOf[ArrayBuffer[Any]].iterator)
+          Left(value.asInstanceOf[ArrayBuffer[Any]])
         } else {
           Right(value.asInstanceOf[ByteBuffer].duplicate())
         }
@@ -199,7 +197,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         for (blockId <- selectedBlocks) {
           val entry = entries.get(blockId)
           val data = if (entry.deserialized) {
-            Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+            Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
           } else {
             Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
           }
diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala
index 426c0d26e9b4422651f2873359b829b6c259584d..467605981b3d99b886adc61b981340710e21b760 100644
--- a/core/src/test/scala/spark/CacheTrackerSuite.scala
+++ b/core/src/test/scala/spark/CacheTrackerSuite.scala
@@ -22,7 +22,7 @@ class CacheTrackerSuite extends FunSuite {
     } catch {
       case e: Exception =>
         throw new SparkException("Error communicating with actor", e)
-    } 
+    }
   }
 
   test("CacheTrackerActor slave initialization & cache status") {
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 433d2fdc19bf14d7486c21bd836b498cb4bee9c0..76b088448125bedc293b35886f3a7e648bf18fc1 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -27,6 +27,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
       sc = null
     }
     System.clearProperty("spark.reducer.maxMbInFlight")
+    System.clearProperty("spark.storage.memoryFraction")
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.master.port")
   }
@@ -156,4 +157,13 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
     assert(data.count() === 1000)
     assert(data.count() === 1000)
   }
+
+  test("compute without caching with low memory") {
+    System.setProperty("spark.storage.memoryFraction", "0.0001")
+    sc = new SparkContext(clusterUrl, "test")
+    val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY)
+    assert(data.count() === 4000000)
+    assert(data.count() === 4000000)
+    assert(data.count() === 4000000)
+  }
 }
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 31b33eae0945af1cf7ab37b4513558b2256c41cc..b9c19e61cd1c26d6e505d5175d9a5a9f4e2bd4a6 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -268,9 +268,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     val list1 = List(new Array[Byte](200), new Array[Byte](200))
     val list2 = List(new Array[Byte](200), new Array[Byte](200))
     val list3 = List(new Array[Byte](200), new Array[Byte](200))
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
-    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY)
-    store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY)
+    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
+    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true)
+    store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true)
     assert(store.get("list2") != None, "list2 was not in store")
     assert(store.get("list2").get.size == 2)
     assert(store.get("list3") != None, "list3 was not in store")
@@ -279,7 +279,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(store.get("list2") != None, "list2 was not in store")
     assert(store.get("list2").get.size == 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY)
+    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
     assert(store.get("list1") != None, "list1 was not in store")
     assert(store.get("list1").get.size == 2)
     assert(store.get("list2") != None, "list2 was not in store")
@@ -294,9 +294,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     val list3 = List(new Array[Byte](200), new Array[Byte](200))
     val list4 = List(new Array[Byte](200), new Array[Byte](200))
     // First store list1 and list2, both in memory, and list3, on disk only
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER)
-    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER)
-    store.put("list3", list3.iterator, StorageLevel.DISK_ONLY)
+    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true)
+    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true)
+    store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true)
     // At this point LRU should not kick in because list3 is only on disk
     assert(store.get("list1") != None, "list2 was not in store")
     assert(store.get("list1").get.size === 2)
@@ -311,7 +311,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(store.get("list3") != None, "list1 was not in store")
     assert(store.get("list3").get.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
-    store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER)
+    store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true)
     assert(store.get("list1") === None, "list1 was in store")
     assert(store.get("list2") != None, "list3 was not in store")
     assert(store.get("list2").get.size === 2)