Skip to content
Snippets Groups Projects
Unverified Commit b08b5004 authored by Takeshi YAMAMURO's avatar Takeshi YAMAMURO Committed by Sean Owen
Browse files

[SPARK-18620][STREAMING][KINESIS] Flatten input rates in timeline for streaming + kinesis

## What changes were proposed in this pull request?
This pr is to make input rates in timeline more flat for spark streaming + kinesis.
Since kinesis workers fetch records and push them into block generators in bulk, timeline in web UI has many spikes when `maxRates` applied (See a Figure.1 below). This fix splits fetched input records into multiple `adRecords` calls.

Figure.1 Apply `maxRates=500` in vanilla Spark
<img width="1084" alt="apply_limit in_vanilla_spark" src="https://cloud.githubusercontent.com/assets/692303/20823861/4602f300-b89b-11e6-95f3-164a37061305.png">

Figure.2 Apply `maxRates=500` in Spark with my patch
<img width="1056" alt="apply_limit in_spark_with_my_patch" src="https://cloud.githubusercontent.com/assets/692303/20823882/6c46352c-b89b-11e6-81ab-afd8abfe0cfe.png">

## How was this patch tested?
Add tests to check to split input records into multiple `addRecords` calls.

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #16114 from maropu/SPARK-18620.
parent be5fc6ef
No related branches found
No related tags found
No related merge requests found
...@@ -221,6 +221,12 @@ private[kinesis] class KinesisReceiver[T]( ...@@ -221,6 +221,12 @@ private[kinesis] class KinesisReceiver[T](
} }
} }
/** Return the current rate limit defined in [[BlockGenerator]]. */
private[kinesis] def getCurrentLimit: Int = {
assert(blockGenerator != null)
math.min(blockGenerator.getCurrentLimit, Int.MaxValue).toInt
}
/** Get the latest sequence number for the given shard that can be checkpointed through KCL */ /** Get the latest sequence number for the given shard that can be checkpointed through KCL */
private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = { private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
Option(shardIdToLatestStoredSeqNum.get(shardId)) Option(shardIdToLatestStoredSeqNum.get(shardId))
......
...@@ -68,8 +68,18 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w ...@@ -68,8 +68,18 @@ private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], w
override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) { override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
if (!receiver.isStopped()) { if (!receiver.isStopped()) {
try { try {
receiver.addRecords(shardId, batch) // Limit the number of processed records from Kinesis stream. This is because the KCL cannot
logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") // control the number of aggregated records to be fetched even if we set `MaxRecords`
// in `KinesisClientLibConfiguration`. For example, if we set 10 to the number of max
// records in a worker and a producer aggregates two records into one message, the worker
// possibly 20 records every callback function called.
val maxRecords = receiver.getCurrentLimit
for (start <- 0 until batch.size by maxRecords) {
val miniBatch = batch.subList(start, math.min(start + maxRecords, batch.size))
receiver.addRecords(shardId, miniBatch)
logDebug(s"Stored: Worker $workerId stored ${miniBatch.size} records " +
s"for shardId $shardId")
}
receiver.setCheckpointer(shardId, checkpointer) receiver.setCheckpointer(shardId, checkpointer)
} catch { } catch {
case NonFatal(e) => case NonFatal(e) =>
......
...@@ -69,6 +69,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft ...@@ -69,6 +69,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
test("process records including store and set checkpointer") { test("process records including store and set checkpointer") {
when(receiverMock.isStopped()).thenReturn(false) when(receiverMock.isStopped()).thenReturn(false)
when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
recordProcessor.initialize(shardId) recordProcessor.initialize(shardId)
...@@ -79,8 +80,23 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft ...@@ -79,8 +80,23 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock) verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
} }
test("split into multiple processes if a limitation is set") {
when(receiverMock.isStopped()).thenReturn(false)
when(receiverMock.getCurrentLimit).thenReturn(1)
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
recordProcessor.initialize(shardId)
recordProcessor.processRecords(batch, checkpointerMock)
verify(receiverMock, times(1)).isStopped()
verify(receiverMock, times(1)).addRecords(shardId, batch.subList(0, 1))
verify(receiverMock, times(1)).addRecords(shardId, batch.subList(1, 2))
verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
}
test("shouldn't store and update checkpointer when receiver is stopped") { test("shouldn't store and update checkpointer when receiver is stopped") {
when(receiverMock.isStopped()).thenReturn(true) when(receiverMock.isStopped()).thenReturn(true)
when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId) val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
recordProcessor.processRecords(batch, checkpointerMock) recordProcessor.processRecords(batch, checkpointerMock)
...@@ -92,6 +108,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft ...@@ -92,6 +108,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
test("shouldn't update checkpointer when exception occurs during store") { test("shouldn't update checkpointer when exception occurs during store") {
when(receiverMock.isStopped()).thenReturn(false) when(receiverMock.isStopped()).thenReturn(false)
when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
when( when(
receiverMock.addRecords(anyString, anyListOf(classOf[Record])) receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
).thenThrow(new RuntimeException()) ).thenThrow(new RuntimeException())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment