diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index e4ff05e12f20190fe312397ba3b42dc641e07f2f..e76e7eb0dea192e54c12149956d5048f995fbd87 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -70,7 +70,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
         val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
 
         // Register the input blocks information into InputInfoTracker
-        val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
+        val inputInfo = InputInfo(id, blockInfos.flatMap(_.numRecords).sum)
         ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
 
         if (blockInfos.nonEmpty) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 92938379b9c177fd938aa8ebee7f235b2f1266dc..8be732b64e3a35b402295763f9101e55dfce5f6a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -138,8 +138,8 @@ private[streaming] class ReceiverSupervisorImpl(
     ) {
     val blockId = blockIdOption.getOrElse(nextBlockId)
     val numRecords = receivedBlock match {
-      case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
-      case _ => -1
+      case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong)
+      case _ => None
     }
 
     val time = System.currentTimeMillis
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
index a72efccf2f994a4a074b9a82b5602c07abdfdffa..7c0db8a863c679b6785f127939d22f8ca31a976e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
@@ -23,7 +23,9 @@ import org.apache.spark.Logging
 import org.apache.spark.streaming.{Time, StreamingContext}
 
 /** To track the information of input stream at specified batch time. */
-private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long)
+private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) {
+  require(numRecords >= 0, "numRecords must not be negative")
+}
 
 /**
  * This class manages all the input streams as well as their input data statistics. The information
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
index dc11e84f29965c92dd8295ebe911d7c55188163b..656ac80df8979bfc3955e0cad7d0962d27453092 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
@@ -24,11 +24,13 @@ import org.apache.spark.streaming.util.WriteAheadLogRecordHandle
 /** Information about blocks received by the receiver */
 private[streaming] case class ReceivedBlockInfo(
     streamId: Int,
-    numRecords: Long,
+    numRecords: Option[Long],
     metadataOption: Option[Any],
     blockStoreResult: ReceivedBlockStoreResult
   ) {
 
+  require(numRecords.isEmpty || numRecords.get >= 0, "numRecords must not be negative")
+
   @volatile private var _isBlockIdValid = true
 
   def blockId: StreamBlockId = blockStoreResult.blockId
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 6f0ee774cb5cf5501e4f32b223efbb92438f2f07..be305b5e0dfeafef468d7ddd0bed46f513d6132d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -224,7 +224,7 @@ class ReceivedBlockTrackerSuite
 
   /** Generate blocks infos using random ids */
   def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
-    List.fill(5)(ReceivedBlockInfo(streamId, 0, None,
+    List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
       BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
   }