Skip to content
Snippets Groups Projects
Commit 38f4e869 authored by Yash Sharma's avatar Yash Sharma Committed by Burak Yavuz
Browse files

[SPARK-20140][DSTREAM] Remove hardcoded kinesis retry wait and max retries

## What changes were proposed in this pull request?

The pull requests proposes to remove the hardcoded values for Amazon Kinesis - MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES.

This change is critical for kinesis checkpoint recovery when the kinesis backed rdd is huge.
Following happens in a typical kinesis recovery :
- kinesis throttles large number of requests while recovering
- retries in case of throttling are not able to recover due to the small wait period
- kinesis throttles per second, the wait period should be configurable for recovery

The patch picks the spark kinesis configs from:
- spark.streaming.kinesis.retry.wait.time
- spark.streaming.kinesis.retry.max.attempts

Jira : https://issues.apache.org/jira/browse/SPARK-20140

## How was this patch tested?

Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the modified configurations. Wasn't able to test the patch with actual throttling.

Author: Yash Sharma <ysharma@atlassian.com>

Closes #17467 from yssharma/ysharma/spark-kinesis-retries.
parent 6f62e9d9
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.auth.AWSCredentials
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
import com.amazonaws.services.kinesis.model._
......@@ -81,9 +81,9 @@ class KinesisBackedBlockRDD[T: ClassTag](
@transient private val _blockIds: Array[BlockId],
@transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
@transient private val isBlockIdValid: Array[Boolean] = Array.empty,
val retryTimeoutMs: Int = 10000,
val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _,
val kinesisCreds: SparkAWSCredentials = DefaultCredentials
val kinesisCreds: SparkAWSCredentials = DefaultCredentials,
val kinesisReadConfigs: KinesisReadConfigurations = KinesisReadConfigurations()
) extends BlockRDD[T](sc, _blockIds) {
require(_blockIds.length == arrayOfseqNumberRanges.length,
......@@ -112,7 +112,7 @@ class KinesisBackedBlockRDD[T: ClassTag](
val credentials = kinesisCreds.provider.getCredentials
partition.seqNumberRanges.ranges.iterator.flatMap { range =>
new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
range, retryTimeoutMs).map(messageHandler)
range, kinesisReadConfigs).map(messageHandler)
}
}
if (partition.isBlockIdValid) {
......@@ -135,7 +135,7 @@ class KinesisSequenceRangeIterator(
endpointUrl: String,
regionId: String,
range: SequenceNumberRange,
retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
kinesisReadConfigs: KinesisReadConfigurations) extends NextIterator[Record] with Logging {
private val client = new AmazonKinesisClient(credentials)
private val streamName = range.streamName
......@@ -251,21 +251,19 @@ class KinesisSequenceRangeIterator(
/** Helper method to retry Kinesis API request with exponential backoff and timeouts */
private def retryOrTimeout[T](message: String)(body: => T): T = {
import KinesisSequenceRangeIterator._
var startTimeMs = System.currentTimeMillis()
val startTimeMs = System.currentTimeMillis()
var retryCount = 0
var waitTimeMs = MIN_RETRY_WAIT_TIME_MS
var result: Option[T] = None
var lastError: Throwable = null
var waitTimeInterval = kinesisReadConfigs.retryWaitTimeMs
def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= retryTimeoutMs
def isMaxRetryDone = retryCount >= MAX_RETRIES
def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= kinesisReadConfigs.retryTimeoutMs
def isMaxRetryDone = retryCount >= kinesisReadConfigs.maxRetries
while (result.isEmpty && !isTimedOut && !isMaxRetryDone) {
if (retryCount > 0) { // wait only if this is a retry
Thread.sleep(waitTimeMs)
waitTimeMs *= 2 // if you have waited, then double wait time for next round
Thread.sleep(waitTimeInterval)
waitTimeInterval *= 2 // if you have waited, then double wait time for next round
}
try {
result = Some(body)
......@@ -284,7 +282,8 @@ class KinesisSequenceRangeIterator(
result.getOrElse {
if (isTimedOut) {
throw new SparkException(
s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError)
s"Timed out after ${kinesisReadConfigs.retryTimeoutMs} ms while " +
s"$message, last exception: ", lastError)
} else {
throw new SparkException(
s"Gave up after $retryCount retries while $message, last exception: ", lastError)
......@@ -292,9 +291,3 @@ class KinesisSequenceRangeIterator(
}
}
}
private[streaming]
object KinesisSequenceRangeIterator {
val MAX_RETRIES = 3
val MIN_RETRY_WAIT_TIME_MS = 100
}
......@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.Record
import KinesisReadConfigurations._
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.rdd.RDD
......@@ -60,12 +61,13 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
new KinesisBackedBlockRDD(
context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
isBlockIdValid = isBlockIdValid,
retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
messageHandler = messageHandler,
kinesisCreds = kinesisCreds)
kinesisCreds = kinesisCreds,
kinesisReadConfigs = KinesisReadConfigurations(ssc))
} else {
logWarning("Kinesis sequence number information was not present with some block metadata," +
" it may not be possible to recover from failures")
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kinesis
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.streaming.StreamingContext
/**
* Configurations to pass to the [[KinesisBackedBlockRDD]].
*
* @param maxRetries: The maximum number of attempts to be made to Kinesis. Defaults to 3.
* @param retryWaitTimeMs: The interval between consequent Kinesis retries.
* Defaults to 100ms.
* @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request.
* Defaults to batch duration provided for streaming,
* else uses 10000 if invoked directly.
*/
private[kinesis] case class KinesisReadConfigurations(
maxRetries: Int,
retryWaitTimeMs: Long,
retryTimeoutMs: Long)
private[kinesis] object KinesisReadConfigurations {
def apply(): KinesisReadConfigurations = {
KinesisReadConfigurations(maxRetries = DEFAULT_MAX_RETRIES,
retryWaitTimeMs = JavaUtils.timeStringAsMs(DEFAULT_RETRY_WAIT_TIME),
retryTimeoutMs = DEFAULT_RETRY_TIMEOUT)
}
def apply(ssc: StreamingContext): KinesisReadConfigurations = {
KinesisReadConfigurations(
maxRetries = ssc.sc.getConf.getInt(RETRY_MAX_ATTEMPTS_KEY, DEFAULT_MAX_RETRIES),
retryWaitTimeMs = JavaUtils.timeStringAsMs(
ssc.sc.getConf.get(RETRY_WAIT_TIME_KEY, DEFAULT_RETRY_WAIT_TIME)),
retryTimeoutMs = ssc.graph.batchDuration.milliseconds)
}
/**
* SparkConf key for configuring the maximum number of retries used when attempting a Kinesis
* request.
*/
val RETRY_MAX_ATTEMPTS_KEY = "spark.streaming.kinesis.retry.maxAttempts"
/**
* SparkConf key for configuring the wait time to use before retrying a Kinesis attempt.
*/
val RETRY_WAIT_TIME_KEY = "spark.streaming.kinesis.retry.waitTime"
/**
* Default value for the RETRY_MAX_ATTEMPTS_KEY
*/
val DEFAULT_MAX_RETRIES = 3
/**
* Default value for the RETRY_WAIT_TIME_KEY
*/
val DEFAULT_RETRY_WAIT_TIME = "100ms"
/**
* Default value for the retry timeout
*/
val DEFAULT_RETRY_TIMEOUT = 10000
}
......@@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kinesis.KinesisReadConfigurations._
import org.apache.spark.streaming.kinesis.KinesisTestUtils._
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
......@@ -136,7 +137,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]]
assert(kinesisRDD.regionName === dummyRegionName)
assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
assert(kinesisRDD.kinesisReadConfigs.retryTimeoutMs === batchDuration.milliseconds)
assert(kinesisRDD.kinesisCreds === BasicCredentials(
awsAccessKeyId = dummyAWSAccessKey,
awsSecretKey = dummyAWSSecretKey))
......@@ -234,6 +235,52 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
ssc.stop(stopSparkContext = false)
}
test("Kinesis read with custom configurations") {
try {
ssc.sc.conf.set(RETRY_WAIT_TIME_KEY, "2000ms")
ssc.sc.conf.set(RETRY_MAX_ATTEMPTS_KEY, "5")
val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc)
.checkpointAppName(appName)
.streamName("dummyStream")
.endpointUrl(dummyEndpointUrl)
.regionName(dummyRegionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.build()
.asInstanceOf[KinesisInputDStream[Array[Byte]]]
val time = Time(1000)
// Generate block info data for testing
val seqNumRanges1 = SequenceNumberRanges(
SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67))
val blockId1 = StreamBlockId(kinesisStream.id, 123)
val blockInfo1 = ReceivedBlockInfo(
0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
val seqNumRanges2 = SequenceNumberRanges(
SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89))
val blockId2 = StreamBlockId(kinesisStream.id, 345)
val blockInfo2 = ReceivedBlockInfo(
0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))
// Verify that the generated KinesisBackedBlockRDD has the all the right information
val blockInfos = Seq(blockInfo1, blockInfo2)
val kinesisRDD =
kinesisStream.createBlockRDD(time, blockInfos).asInstanceOf[KinesisBackedBlockRDD[_]]
assert(kinesisRDD.kinesisReadConfigs.retryWaitTimeMs === 2000)
assert(kinesisRDD.kinesisReadConfigs.maxRetries === 5)
assert(kinesisRDD.kinesisReadConfigs.retryTimeoutMs === batchDuration.milliseconds)
} finally {
ssc.sc.conf.remove(RETRY_WAIT_TIME_KEY)
ssc.sc.conf.remove(RETRY_MAX_ATTEMPTS_KEY)
ssc.stop(stopSparkContext = false)
}
}
testIfEnabled("split and merge shards in a stream") {
// Since this test tries to split and merge shards in a stream, we create another
// temporary stream and then remove it when finished.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment