Skip to content
Snippets Groups Projects
Commit 4f16d3fe authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-8112] [STREAMING] Fix the negative event count issue

Author: zsxwing <zsxwing@gmail.com>

Closes #6659 from zsxwing/SPARK-8112 and squashes the following commits:

a5d7da6 [zsxwing] Address comments
d255b6e [zsxwing] Fix the negative event count issue
parent 3f80bc84
No related branches found
No related tags found
No related merge requests found
......@@ -70,7 +70,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
// Register the input blocks information into InputInfoTracker
val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
val inputInfo = InputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
if (blockInfos.nonEmpty) {
......
......@@ -138,8 +138,8 @@ private[streaming] class ReceiverSupervisorImpl(
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
case _ => -1
case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong)
case _ => None
}
val time = System.currentTimeMillis
......
......@@ -23,7 +23,9 @@ import org.apache.spark.Logging
import org.apache.spark.streaming.{Time, StreamingContext}
/** To track the information of input stream at specified batch time. */
private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long)
private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) {
require(numRecords >= 0, "numRecords must not be negative")
}
/**
* This class manages all the input streams as well as their input data statistics. The information
......
......@@ -24,11 +24,13 @@ import org.apache.spark.streaming.util.WriteAheadLogRecordHandle
/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Long,
numRecords: Option[Long],
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
) {
require(numRecords.isEmpty || numRecords.get >= 0, "numRecords must not be negative")
@volatile private var _isBlockIdValid = true
def blockId: StreamBlockId = blockStoreResult.blockId
......
......@@ -224,7 +224,7 @@ class ReceivedBlockTrackerSuite
/** Generate blocks infos using random ids */
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
List.fill(5)(ReceivedBlockInfo(streamId, 0, None,
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment