From 38f4e8692ce3b6cbcfe0c1aff9b5e662f7a308b7 Mon Sep 17 00:00:00 2001 From: Yash Sharma <ysharma@atlassian.com> Date: Tue, 16 May 2017 15:08:05 -0700 Subject: [PATCH] [SPARK-20140][DSTREAM] Remove hardcoded kinesis retry wait and max retries ## What changes were proposed in this pull request? The pull requests proposes to remove the hardcoded values for Amazon Kinesis - MIN_RETRY_WAIT_TIME_MS, MAX_RETRIES. This change is critical for kinesis checkpoint recovery when the kinesis backed rdd is huge. Following happens in a typical kinesis recovery : - kinesis throttles large number of requests while recovering - retries in case of throttling are not able to recover due to the small wait period - kinesis throttles per second, the wait period should be configurable for recovery The patch picks the spark kinesis configs from: - spark.streaming.kinesis.retry.wait.time - spark.streaming.kinesis.retry.max.attempts Jira : https://issues.apache.org/jira/browse/SPARK-20140 ## How was this patch tested? Modified the KinesisBackedBlockRDDSuite.scala to run kinesis tests with the modified configurations. Wasn't able to test the patch with actual throttling. Author: Yash Sharma <ysharma@atlassian.com> Closes #17467 from yssharma/ysharma/spark-kinesis-retries. --- .../kinesis/KinesisBackedBlockRDD.scala | 33 ++++---- .../kinesis/KinesisInputDStream.scala | 6 +- .../kinesis/KinesisReadConfigurations.scala | 78 +++++++++++++++++++ .../kinesis/KinesisStreamSuite.scala | 49 +++++++++++- 4 files changed, 143 insertions(+), 23 deletions(-) create mode 100644 external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index f31ebf1ec8..88b294246b 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal -import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} +import com.amazonaws.auth.AWSCredentials import com.amazonaws.services.kinesis.AmazonKinesisClient import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord import com.amazonaws.services.kinesis.model._ @@ -81,9 +81,9 @@ class KinesisBackedBlockRDD[T: ClassTag]( @transient private val _blockIds: Array[BlockId], @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges], @transient private val isBlockIdValid: Array[Boolean] = Array.empty, - val retryTimeoutMs: Int = 10000, val messageHandler: Record => T = KinesisInputDStream.defaultMessageHandler _, - val kinesisCreds: SparkAWSCredentials = DefaultCredentials + val kinesisCreds: SparkAWSCredentials = DefaultCredentials, + val kinesisReadConfigs: KinesisReadConfigurations = KinesisReadConfigurations() ) extends BlockRDD[T](sc, _blockIds) { require(_blockIds.length == arrayOfseqNumberRanges.length, @@ -112,7 +112,7 @@ class KinesisBackedBlockRDD[T: ClassTag]( val credentials = kinesisCreds.provider.getCredentials partition.seqNumberRanges.ranges.iterator.flatMap { range => new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName, - range, retryTimeoutMs).map(messageHandler) + range, kinesisReadConfigs).map(messageHandler) } } if (partition.isBlockIdValid) { @@ -135,7 +135,7 @@ class KinesisSequenceRangeIterator( endpointUrl: String, regionId: String, range: SequenceNumberRange, - retryTimeoutMs: Int) extends NextIterator[Record] with Logging { + kinesisReadConfigs: KinesisReadConfigurations) extends NextIterator[Record] with Logging { private val client = new AmazonKinesisClient(credentials) private val streamName = range.streamName @@ -251,21 +251,19 @@ class KinesisSequenceRangeIterator( /** Helper method to retry Kinesis API request with exponential backoff and timeouts */ private def retryOrTimeout[T](message: String)(body: => T): T = { - import KinesisSequenceRangeIterator._ - - var startTimeMs = System.currentTimeMillis() + val startTimeMs = System.currentTimeMillis() var retryCount = 0 - var waitTimeMs = MIN_RETRY_WAIT_TIME_MS var result: Option[T] = None var lastError: Throwable = null + var waitTimeInterval = kinesisReadConfigs.retryWaitTimeMs - def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= retryTimeoutMs - def isMaxRetryDone = retryCount >= MAX_RETRIES + def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= kinesisReadConfigs.retryTimeoutMs + def isMaxRetryDone = retryCount >= kinesisReadConfigs.maxRetries while (result.isEmpty && !isTimedOut && !isMaxRetryDone) { if (retryCount > 0) { // wait only if this is a retry - Thread.sleep(waitTimeMs) - waitTimeMs *= 2 // if you have waited, then double wait time for next round + Thread.sleep(waitTimeInterval) + waitTimeInterval *= 2 // if you have waited, then double wait time for next round } try { result = Some(body) @@ -284,7 +282,8 @@ class KinesisSequenceRangeIterator( result.getOrElse { if (isTimedOut) { throw new SparkException( - s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError) + s"Timed out after ${kinesisReadConfigs.retryTimeoutMs} ms while " + + s"$message, last exception: ", lastError) } else { throw new SparkException( s"Gave up after $retryCount retries while $message, last exception: ", lastError) @@ -292,9 +291,3 @@ class KinesisSequenceRangeIterator( } } } - -private[streaming] -object KinesisSequenceRangeIterator { - val MAX_RETRIES = 3 - val MIN_RETRY_WAIT_TIME_MS = 100 -} diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala index 77553412ed..decfb6b3eb 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -21,6 +21,7 @@ import scala.reflect.ClassTag import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.model.Record +import KinesisReadConfigurations._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.rdd.RDD @@ -60,12 +61,13 @@ private[kinesis] class KinesisInputDStream[T: ClassTag]( val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " + s"seq number ranges: ${seqNumRanges.mkString(", ")} ") + new KinesisBackedBlockRDD( context.sc, regionName, endpointUrl, blockIds, seqNumRanges, isBlockIdValid = isBlockIdValid, - retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt, messageHandler = messageHandler, - kinesisCreds = kinesisCreds) + kinesisCreds = kinesisCreds, + kinesisReadConfigs = KinesisReadConfigurations(ssc)) } else { logWarning("Kinesis sequence number information was not present with some block metadata," + " it may not be possible to recover from failures") diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala new file mode 100644 index 0000000000..871071e467 --- /dev/null +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReadConfigurations.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.streaming.StreamingContext + +/** + * Configurations to pass to the [[KinesisBackedBlockRDD]]. + * + * @param maxRetries: The maximum number of attempts to be made to Kinesis. Defaults to 3. + * @param retryWaitTimeMs: The interval between consequent Kinesis retries. + * Defaults to 100ms. + * @param retryTimeoutMs: The timeout in milliseconds for a Kinesis request. + * Defaults to batch duration provided for streaming, + * else uses 10000 if invoked directly. + */ +private[kinesis] case class KinesisReadConfigurations( + maxRetries: Int, + retryWaitTimeMs: Long, + retryTimeoutMs: Long) + +private[kinesis] object KinesisReadConfigurations { + def apply(): KinesisReadConfigurations = { + KinesisReadConfigurations(maxRetries = DEFAULT_MAX_RETRIES, + retryWaitTimeMs = JavaUtils.timeStringAsMs(DEFAULT_RETRY_WAIT_TIME), + retryTimeoutMs = DEFAULT_RETRY_TIMEOUT) + } + + def apply(ssc: StreamingContext): KinesisReadConfigurations = { + KinesisReadConfigurations( + maxRetries = ssc.sc.getConf.getInt(RETRY_MAX_ATTEMPTS_KEY, DEFAULT_MAX_RETRIES), + retryWaitTimeMs = JavaUtils.timeStringAsMs( + ssc.sc.getConf.get(RETRY_WAIT_TIME_KEY, DEFAULT_RETRY_WAIT_TIME)), + retryTimeoutMs = ssc.graph.batchDuration.milliseconds) + } + + /** + * SparkConf key for configuring the maximum number of retries used when attempting a Kinesis + * request. + */ + val RETRY_MAX_ATTEMPTS_KEY = "spark.streaming.kinesis.retry.maxAttempts" + + /** + * SparkConf key for configuring the wait time to use before retrying a Kinesis attempt. + */ + val RETRY_WAIT_TIME_KEY = "spark.streaming.kinesis.retry.waitTime" + + /** + * Default value for the RETRY_MAX_ATTEMPTS_KEY + */ + val DEFAULT_MAX_RETRIES = 3 + + /** + * Default value for the RETRY_WAIT_TIME_KEY + */ + val DEFAULT_RETRY_WAIT_TIME = "100ms" + + /** + * Default value for the retry timeout + */ + val DEFAULT_RETRY_TIMEOUT = 10000 +} diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 341a6898cb..7e5bda923f 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.kinesis.KinesisReadConfigurations._ import org.apache.spark.streaming.kinesis.KinesisTestUtils._ import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult import org.apache.spark.streaming.scheduler.ReceivedBlockInfo @@ -136,7 +137,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]] assert(kinesisRDD.regionName === dummyRegionName) assert(kinesisRDD.endpointUrl === dummyEndpointUrl) - assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds) + assert(kinesisRDD.kinesisReadConfigs.retryTimeoutMs === batchDuration.milliseconds) assert(kinesisRDD.kinesisCreds === BasicCredentials( awsAccessKeyId = dummyAWSAccessKey, awsSecretKey = dummyAWSSecretKey)) @@ -234,6 +235,52 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ssc.stop(stopSparkContext = false) } + test("Kinesis read with custom configurations") { + try { + ssc.sc.conf.set(RETRY_WAIT_TIME_KEY, "2000ms") + ssc.sc.conf.set(RETRY_MAX_ATTEMPTS_KEY, "5") + + val kinesisStream = KinesisInputDStream.builder.streamingContext(ssc) + .checkpointAppName(appName) + .streamName("dummyStream") + .endpointUrl(dummyEndpointUrl) + .regionName(dummyRegionName) + .initialPositionInStream(InitialPositionInStream.LATEST) + .checkpointInterval(Seconds(10)) + .storageLevel(StorageLevel.MEMORY_ONLY) + .build() + .asInstanceOf[KinesisInputDStream[Array[Byte]]] + + val time = Time(1000) + // Generate block info data for testing + val seqNumRanges1 = SequenceNumberRanges( + SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67)) + val blockId1 = StreamBlockId(kinesisStream.id, 123) + val blockInfo1 = ReceivedBlockInfo( + 0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None)) + + val seqNumRanges2 = SequenceNumberRanges( + SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89)) + val blockId2 = StreamBlockId(kinesisStream.id, 345) + val blockInfo2 = ReceivedBlockInfo( + 0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None)) + + // Verify that the generated KinesisBackedBlockRDD has the all the right information + val blockInfos = Seq(blockInfo1, blockInfo2) + + val kinesisRDD = + kinesisStream.createBlockRDD(time, blockInfos).asInstanceOf[KinesisBackedBlockRDD[_]] + + assert(kinesisRDD.kinesisReadConfigs.retryWaitTimeMs === 2000) + assert(kinesisRDD.kinesisReadConfigs.maxRetries === 5) + assert(kinesisRDD.kinesisReadConfigs.retryTimeoutMs === batchDuration.milliseconds) + } finally { + ssc.sc.conf.remove(RETRY_WAIT_TIME_KEY) + ssc.sc.conf.remove(RETRY_MAX_ATTEMPTS_KEY) + ssc.stop(stopSparkContext = false) + } + } + testIfEnabled("split and merge shards in a stream") { // Since this test tries to split and merge shards in a stream, we create another // temporary stream and then remove it when finished. -- GitLab