diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 207d64d9414ee3b1740b8a5463c519feb9759b75..c8dd6e06812dc649fe7423e058e3b87cf229b374 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -32,7 +32,10 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
 
 /** Trait that represents the metadata related to storage of blocks */
 private[streaming] trait ReceivedBlockStoreResult {
-  def blockId: StreamBlockId  // Any implementation of this trait will store a block id
+  // Any implementation of this trait will store a block id
+  def blockId: StreamBlockId
+  // Any implementation of this trait will have to return the number of records
+  def numRecords: Option[Long]
 }
 
 /** Trait that represents a class that handles the storage of blocks received by receiver */
@@ -51,7 +54,8 @@ private[streaming] trait ReceivedBlockHandler {
  * that stores the metadata related to storage of blocks using
  * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
  */
-private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId)
+private[streaming] case class BlockManagerBasedStoreResult(
+      blockId: StreamBlockId, numRecords: Option[Long])
   extends ReceivedBlockStoreResult
 
 
@@ -64,11 +68,20 @@ private[streaming] class BlockManagerBasedBlockHandler(
   extends ReceivedBlockHandler with Logging {
 
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
+
+    var numRecords = None: Option[Long]
+
     val putResult: Seq[(BlockId, BlockStatus)] = block match {
       case ArrayBufferBlock(arrayBuffer) =>
-        blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
+        numRecords = Some(arrayBuffer.size.toLong)
+        blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
+          tellMaster = true)
       case IteratorBlock(iterator) =>
-        blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
+        val countIterator = new CountingIterator(iterator)
+        val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
+          tellMaster = true)
+        numRecords = countIterator.count
+        putResult
       case ByteBufferBlock(byteBuffer) =>
         blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
       case o =>
@@ -79,7 +92,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
       throw new SparkException(
         s"Could not store $blockId to block manager with storage level $storageLevel")
     }
-    BlockManagerBasedStoreResult(blockId)
+    BlockManagerBasedStoreResult(blockId, numRecords)
   }
 
   def cleanupOldBlocks(threshTime: Long) {
@@ -96,6 +109,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
  */
 private[streaming] case class WriteAheadLogBasedStoreResult(
     blockId: StreamBlockId,
+    numRecords: Option[Long],
     walRecordHandle: WriteAheadLogRecordHandle
   ) extends ReceivedBlockStoreResult
 
@@ -151,12 +165,17 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
    */
   def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
 
+    var numRecords = None: Option[Long]
     // Serialize the block so that it can be inserted into both
     val serializedBlock = block match {
       case ArrayBufferBlock(arrayBuffer) =>
+        numRecords = Some(arrayBuffer.size.toLong)
         blockManager.dataSerialize(blockId, arrayBuffer.iterator)
       case IteratorBlock(iterator) =>
-        blockManager.dataSerialize(blockId, iterator)
+        val countIterator = new CountingIterator(iterator)
+        val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
+        numRecords = countIterator.count
+        serializedBlock
       case ByteBufferBlock(byteBuffer) =>
         byteBuffer
       case _ =>
@@ -181,7 +200,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
     // Combine the futures, wait for both to complete, and return the write ahead log record handle
     val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
     val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
-    WriteAheadLogBasedStoreResult(blockId, walRecordHandle)
+    WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
   }
 
   def cleanupOldBlocks(threshTime: Long) {
@@ -199,3 +218,23 @@ private[streaming] object WriteAheadLogBasedBlockHandler {
     new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString
   }
 }
+
+/**
+ * A utility that will wrap the Iterator to get the count
+ */
+private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] {
+   private var _count = 0
+
+   private def isFullyConsumed: Boolean = !iterator.hasNext
+
+   def hasNext(): Boolean = iterator.hasNext
+
+   def count(): Option[Long] = {
+     if (isFullyConsumed) Some(_count) else None
+   }
+
+   def next(): T = {
+    _count += 1
+    iterator.next()
+   }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 8be732b64e3a35b402295763f9101e55dfce5f6a..6078cdf8f87908361e178f31247fe18138c3fb44 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -137,15 +137,10 @@ private[streaming] class ReceiverSupervisorImpl(
       blockIdOption: Option[StreamBlockId]
     ) {
     val blockId = blockIdOption.getOrElse(nextBlockId)
-    val numRecords = receivedBlock match {
-      case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong)
-      case _ => None
-    }
-
     val time = System.currentTimeMillis
     val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
     logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
-
+    val numRecords = blockStoreResult.numRecords
     val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
     trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
     logDebug(s"Reported block $blockId")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index cca8cedb1d08034924247891f50d5d9f07f90e63..6c0c926755c20c3fd5d07a8632481b6c98d6127d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -49,7 +49,6 @@ class ReceivedBlockHandlerSuite
 
   val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
   val hadoopConf = new Configuration()
-  val storageLevel = StorageLevel.MEMORY_ONLY_SER
   val streamId = 1
   val securityMgr = new SecurityManager(conf)
   val mapOutputTracker = new MapOutputTrackerMaster(conf)
@@ -57,10 +56,12 @@ class ReceivedBlockHandlerSuite
   val serializer = new KryoSerializer(conf)
   val manualClock = new ManualClock
   val blockManagerSize = 10000000
+  val blockManagerBuffer = new ArrayBuffer[BlockManager]()
 
   var rpcEnv: RpcEnv = null
   var blockManagerMaster: BlockManagerMaster = null
   var blockManager: BlockManager = null
+  var storageLevel: StorageLevel = null
   var tempDirectory: File = null
 
   before {
@@ -70,20 +71,21 @@ class ReceivedBlockHandlerSuite
     blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
 
-    blockManager = new BlockManager("bm", rpcEnv, blockManagerMaster, serializer,
-      blockManagerSize, conf, mapOutputTracker, shuffleManager,
-      new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
-    blockManager.initialize("app-id")
+    storageLevel = StorageLevel.MEMORY_ONLY_SER
+    blockManager = createBlockManager(blockManagerSize, conf)
 
     tempDirectory = Utils.createTempDir()
     manualClock.setTime(0)
   }
 
   after {
-    if (blockManager != null) {
-      blockManager.stop()
-      blockManager = null
+    for ( blockManager <- blockManagerBuffer ) {
+      if (blockManager != null) {
+        blockManager.stop()
+      }
     }
+    blockManager = null
+    blockManagerBuffer.clear()
     if (blockManagerMaster != null) {
       blockManagerMaster.stop()
       blockManagerMaster = null
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
     }
   }
 
+  test("Test Block - count messages") {
+    // Test count with BlockManagedBasedBlockHandler
+    testCountWithBlockManagerBasedBlockHandler(true)
+    // Test count with WriteAheadLogBasedBlockHandler
+    testCountWithBlockManagerBasedBlockHandler(false)
+  }
+
+  test("Test Block - isFullyConsumed") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+    // spark.storage.unrollFraction set to 0.4 for BlockManager
+    sparkConf.set("spark.storage.unrollFraction", "0.4")
+    // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+    blockManager = createBlockManager(12000, sparkConf)
+
+    // there is not enough space to store this block in MEMORY,
+    // But BlockManager will be able to sereliaze this block to WAL
+    // and hence count returns correct value.
+     testRecordcount(false, StorageLevel.MEMORY_ONLY,
+      IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
+
+    // there is not enough space to store this block in MEMORY,
+    // But BlockManager will be able to sereliaze this block to DISK
+    // and hence count returns correct value.
+    testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+      IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
+
+    // there is not enough space to store this block With MEMORY_ONLY StorageLevel.
+    // BlockManager will not be able to unroll this block
+    // and hence it will not tryToPut this block, resulting the SparkException
+    storageLevel = StorageLevel.MEMORY_ONLY
+    withBlockManagerBasedBlockHandler { handler =>
+      val thrown = intercept[SparkException] {
+        storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator))
+      }
+    }
+  }
+
+  private def testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: Boolean) {
+    // ByteBufferBlock-MEMORY_ONLY
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
+      ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None)
+    // ByteBufferBlock-MEMORY_ONLY_SER
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
+      ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None)
+    // ArrayBufferBlock-MEMORY_ONLY
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
+      ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+    // ArrayBufferBlock-MEMORY_ONLY_SER
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
+      ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+    // ArrayBufferBlock-DISK_ONLY
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY,
+      ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+    // ArrayBufferBlock-MEMORY_AND_DISK
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK,
+      ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+    // IteratorBlock-MEMORY_ONLY
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
+      IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100))
+    // IteratorBlock-MEMORY_ONLY_SER
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
+      IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100))
+    // IteratorBlock-DISK_ONLY
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY,
+      IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, Some(125))
+    // IteratorBlock-MEMORY_AND_DISK
+    testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK,
+      IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, Some(150))
+  }
+
+  private def createBlockManager(
+      maxMem: Long,
+      conf: SparkConf,
+      name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+    val transfer = new NioBlockTransferService(conf, securityMgr)
+    val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf,
+      mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+    manager.initialize("app-id")
+    blockManagerBuffer += manager
+    manager
+  }
+
+  /**
+   * Test storing of data using different types of Handler, StorageLevle and ReceivedBlocks
+   * and verify the correct record count
+   */
+  private def testRecordcount(isBlockManagedBasedBlockHandler: Boolean,
+      sLevel: StorageLevel,
+      receivedBlock: ReceivedBlock,
+      bManager: BlockManager,
+      expectedNumRecords: Option[Long]
+      ) {
+    blockManager = bManager
+    storageLevel = sLevel
+    var bId: StreamBlockId = null
+    try {
+      if (isBlockManagedBasedBlockHandler) {
+        // test received block with BlockManager based handler
+        withBlockManagerBasedBlockHandler { handler =>
+          val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock)
+          bId = blockId
+          assert(blockStoreResult.numRecords === expectedNumRecords,
+            "Message count not matches for a " +
+            receivedBlock.getClass.getName +
+            " being inserted using BlockManagerBasedBlockHandler with " + sLevel)
+       }
+      } else {
+        // test received block with WAL based handler
+        withWriteAheadLogBasedBlockHandler { handler =>
+          val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock)
+          bId = blockId
+          assert(blockStoreResult.numRecords === expectedNumRecords,
+            "Message count not matches for a " +
+            receivedBlock.getClass.getName +
+            " being inserted using WriteAheadLogBasedBlockHandler with " + sLevel)
+        }
+      }
+    } finally {
+     // Removing the Block Id to use same blockManager for next test
+     blockManager.removeBlock(bId, true)
+    }
+  }
+
   /**
    * Test storing of data using different forms of ReceivedBlocks and verify that they succeeded
    * using the given verification function
@@ -251,9 +377,21 @@ class ReceivedBlockHandlerSuite
     (blockIds, storeResults)
   }
 
+  /** Store single block using a handler */
+  private def storeSingleBlock(
+      handler: ReceivedBlockHandler,
+      block: ReceivedBlock
+    ): (StreamBlockId, ReceivedBlockStoreResult) = {
+    val blockId = generateBlockId
+    val blockStoreResult = handler.storeBlock(blockId, block)
+    logDebug("Done inserting")
+    (blockId, blockStoreResult)
+  }
+
   private def getWriteAheadLogFiles(): Seq[String] = {
     getLogFilesInDirectory(checkpointDirToLogDir(tempDirectory.toString, streamId))
   }
 
   private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong)
 }
+
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index be305b5e0dfeafef468d7ddd0bed46f513d6132d..f793a12843b2f39096fcaeab72b20e4e79ae7219 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -225,7 +225,7 @@ class ReceivedBlockTrackerSuite
   /** Generate blocks infos using random ids */
   def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
     List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
-      BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
+      BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
   }
 
   /** Get all the data written in the given write ahead log file. */