diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 26805e96210ab2cfa3226dc905bb2b31dfc8b2a5..122a529bb76e3675ea16a5d3b282357e09b96895 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -140,12 +140,10 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
 
 
   /**
-   * Pushes a block (as iterator of values) into the block manager.
+   * Pushes a block (as an ArrayBuffer filled with data) into the block manager.
    */
-  def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) {
-    val buffer = new ArrayBuffer[T] ++ iterator
-    env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level)
-
+  def pushBlock(blockId: String, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
+    env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
     actor ! ReportBlock(blockId, metadata)
   }
 
@@ -195,7 +193,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
   class BlockGenerator(storageLevel: StorageLevel)
     extends Serializable with Logging {
 
-    case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
+    case class Block(id: String, buffer: ArrayBuffer[T], metadata: Any = null)
 
     val clock = new SystemClock()
     val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong
@@ -222,17 +220,13 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
       currentBuffer += obj
     }
 
-    private def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
-      new Block(blockId, iterator)
-    }
-
     private def updateCurrentBuffer(time: Long) {
       try {
         val newBlockBuffer = currentBuffer
         currentBuffer = new ArrayBuffer[T]
         if (newBlockBuffer.size > 0) {
           val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval)
-          val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
+          val newBlock = new Block(blockId, newBlockBuffer)
           blocksForPushing.add(newBlock)
         }
       } catch {
@@ -248,7 +242,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
       try {
         while(true) {
           val block = blocksForPushing.take()
-          NetworkReceiver.this.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
+          NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel)
         }
       } catch {
         case ie: InterruptedException =>
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
index b3201d0b28d797c3a8d7e200e19b25fff878730b..036c95a860c5dfde3d281ce2138ec8785c8180ff 100644
--- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
@@ -9,6 +9,8 @@ import spark.streaming.dstream.NetworkReceiver
 
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.mutable.ArrayBuffer
+
 /** A helper with set of defaults for supervisor strategy **/
 object ReceiverSupervisorStrategy {
 
@@ -136,8 +138,9 @@ private[streaming] class ActorReceiver[T: ClassManifest](
   }
 
   protected def pushBlock(iter: Iterator[T]) {
-    pushBlock("block-" + streamId + "-" + System.nanoTime(),
-      iter, null, storageLevel)
+    val buffer = new ArrayBuffer[T]
+    buffer ++= iter
+    pushBlock("block-" + streamId + "-" + System.nanoTime(), buffer, null, storageLevel)
   }
 
   protected def onStart() = {