Skip to content
Snippets Groups Projects
Commit 46a64d1e authored by Gaurav's avatar Gaurav Committed by Burak Yavuz
Browse files

[SPARK-19304][STREAMING][KINESIS] fix kinesis slow checkpoint recovery

## What changes were proposed in this pull request?
added a limit to getRecords api call call in KinesisBackedBlockRdd. This helps reduce the amount of data returned by kinesis api call making the recovery considerably faster

As we are storing the `fromSeqNum` & `toSeqNum` in checkpoint metadata, we can also store the number of records. Which can later be used for api call.

## How was this patch tested?
The patch was manually tested

Apologies for any silly mistakes, opening first pull request

Author: Gaurav <gaurav@techtinium.com>

Closes #16842 from Gauravshah/kinesis_checkpoint_recovery_fix_2_1_0.
parent 339b53a1
No related branches found
No related tags found
No related merge requests found
......@@ -36,7 +36,11 @@ import org.apache.spark.util.NextIterator
/** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */
private[kinesis]
case class SequenceNumberRange(
streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String)
streamName: String,
shardId: String,
fromSeqNumber: String,
toSeqNumber: String,
recordCount: Int)
/** Class representing an array of Kinesis sequence number ranges */
private[kinesis]
......@@ -136,6 +140,8 @@ class KinesisSequenceRangeIterator(
private val client = new AmazonKinesisClient(credentials)
private val streamName = range.streamName
private val shardId = range.shardId
// AWS limits to maximum of 10k records per get call
private val maxGetRecordsLimit = 10000
private var toSeqNumberReceived = false
private var lastSeqNumber: String = null
......@@ -153,12 +159,14 @@ class KinesisSequenceRangeIterator(
// If the internal iterator has not been initialized,
// then fetch records from starting sequence number
internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber)
internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber,
range.recordCount)
} else if (!internalIterator.hasNext) {
// If the internal iterator does not have any more records,
// then fetch more records after the last consumed sequence number
internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber)
internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber,
range.recordCount)
}
if (!internalIterator.hasNext) {
......@@ -191,9 +199,12 @@ class KinesisSequenceRangeIterator(
/**
* Get records starting from or after the given sequence number.
*/
private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = {
private def getRecords(
iteratorType: ShardIteratorType,
seqNum: String,
recordCount: Int): Iterator[Record] = {
val shardIterator = getKinesisIterator(iteratorType, seqNum)
val result = getRecordsAndNextKinesisIterator(shardIterator)
val result = getRecordsAndNextKinesisIterator(shardIterator, recordCount)
result._1
}
......@@ -202,10 +213,12 @@ class KinesisSequenceRangeIterator(
* to get records from Kinesis), and get the next shard iterator for next consumption.
*/
private def getRecordsAndNextKinesisIterator(
shardIterator: String): (Iterator[Record], String) = {
shardIterator: String,
recordCount: Int): (Iterator[Record], String) = {
val getRecordsRequest = new GetRecordsRequest
getRecordsRequest.setRequestCredentials(credentials)
getRecordsRequest.setShardIterator(shardIterator)
getRecordsRequest.setLimit(Math.min(recordCount, this.maxGetRecordsLimit))
val getRecordsResult = retryOrTimeout[GetRecordsResult](
s"getting records using shard iterator") {
client.getRecords(getRecordsRequest)
......
......@@ -210,7 +210,8 @@ private[kinesis] class KinesisReceiver[T](
if (records.size > 0) {
val dataIterator = records.iterator().asScala.map(messageHandler)
val metadata = SequenceNumberRange(streamName, shardId,
records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber(),
records.size())
blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
}
}
......
......@@ -51,7 +51,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }}
shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
val seqNumRange = SequenceNumberRange(
testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last)
testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, seqNumbers.size)
(shardId, seqNumRange)
}
allRanges = shardIdToRange.values.toSeq
......@@ -181,7 +181,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
// Create the necessary ranges to use in the RDD
val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)(
SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")))
SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 1)))
val realRanges = Array.tabulate(numPartitionsInKinesis) { i =>
val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis)))
SequenceNumberRanges(Array(range))
......
......@@ -119,13 +119,13 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
// Generate block info data for testing
val seqNumRanges1 = SequenceNumberRanges(
SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))
SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67))
val blockId1 = StreamBlockId(kinesisStream.id, 123)
val blockInfo1 = ReceivedBlockInfo(
0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
val seqNumRanges2 = SequenceNumberRanges(
SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb"))
SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89))
val blockId2 = StreamBlockId(kinesisStream.id, 345)
val blockInfo2 = ReceivedBlockInfo(
0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment