Skip to content
Snippets Groups Projects
Commit e6bef7d5 authored by Xin Ren's avatar Xin Ren Committed by Shixiong Zhu
Browse files

[SPARK-17038][STREAMING] fix metrics retrieval source of 'lastReceivedBatch'

https://issues.apache.org/jira/browse/SPARK-17038

## What changes were proposed in this pull request?

StreamingSource's lastReceivedBatch_submissionTime, lastReceivedBatch_processingTimeStart, and lastReceivedBatch_processingTimeEnd all use data from lastCompletedBatch instead of lastReceivedBatch.

In particular, this makes it impossible to match lastReceivedBatch_records with a batchID/submission time.

This is apparent when looking at StreamingSource.scala, lines 89-94.

## How was this patch tested?

Manually running unit tests on local laptop

Author: Xin Ren <iamshrek@126.com>

Closes #14681 from keypointt/SPARK-17038.
parent d60af8f6
No related branches found
No related tags found
No related merge requests found
...@@ -87,11 +87,11 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { ...@@ -87,11 +87,11 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
// Gauge for last received batch, useful for monitoring the streaming job's running status, // Gauge for last received batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition. // displayed data -1 for any abnormal condition.
registerGaugeWithOption("lastReceivedBatch_submissionTime", registerGaugeWithOption("lastReceivedBatch_submissionTime",
_.lastCompletedBatch.map(_.submissionTime), -1L) _.lastReceivedBatch.map(_.submissionTime), -1L)
registerGaugeWithOption("lastReceivedBatch_processingStartTime", registerGaugeWithOption("lastReceivedBatch_processingStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime), -1L) _.lastReceivedBatch.flatMap(_.processingStartTime), -1L)
registerGaugeWithOption("lastReceivedBatch_processingEndTime", registerGaugeWithOption("lastReceivedBatch_processingEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L) _.lastReceivedBatch.flatMap(_.processingEndTime), -1L)
// Gauge for last received batch records. // Gauge for last received batch records.
registerGauge("lastReceivedBatch_records", _.lastReceivedBatchRecords.values.sum, 0L) registerGauge("lastReceivedBatch_records", _.lastReceivedBatchRecords.values.sum, 0L)
......
...@@ -68,6 +68,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { ...@@ -68,6 +68,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted))) listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
listener.runningBatches should be (Nil) listener.runningBatches should be (Nil)
listener.retainedCompletedBatches should be (Nil) listener.retainedCompletedBatches should be (Nil)
listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoSubmitted)))
listener.lastCompletedBatch should be (None) listener.lastCompletedBatch should be (None)
listener.numUnprocessedBatches should be (1) listener.numUnprocessedBatches should be (1)
listener.numTotalCompletedBatches should be (0) listener.numTotalCompletedBatches should be (0)
...@@ -81,6 +82,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { ...@@ -81,6 +82,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.waitingBatches should be (Nil) listener.waitingBatches should be (Nil)
listener.runningBatches should be (List(BatchUIData(batchInfoStarted))) listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
listener.retainedCompletedBatches should be (Nil) listener.retainedCompletedBatches should be (Nil)
listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoStarted)))
listener.lastCompletedBatch should be (None) listener.lastCompletedBatch should be (None)
listener.numUnprocessedBatches should be (1) listener.numUnprocessedBatches should be (1)
listener.numTotalCompletedBatches should be (0) listener.numTotalCompletedBatches should be (0)
...@@ -123,6 +125,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { ...@@ -123,6 +125,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.waitingBatches should be (Nil) listener.waitingBatches should be (Nil)
listener.runningBatches should be (Nil) listener.runningBatches should be (Nil)
listener.retainedCompletedBatches should be (List(BatchUIData(batchInfoCompleted))) listener.retainedCompletedBatches should be (List(BatchUIData(batchInfoCompleted)))
listener.lastReceivedBatch should be (Some(BatchUIData(batchInfoCompleted)))
listener.lastCompletedBatch should be (Some(BatchUIData(batchInfoCompleted))) listener.lastCompletedBatch should be (Some(BatchUIData(batchInfoCompleted)))
listener.numUnprocessedBatches should be (0) listener.numUnprocessedBatches should be (0)
listener.numTotalCompletedBatches should be (1) listener.numTotalCompletedBatches should be (1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment