diff --git a/docs/configuration.md b/docs/configuration.md index 0dbfe3b0796ba29ad7d393365c7458918cf91b94..a2c0dfe76ca4537722a843d963e69fe0cf8c88ca 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1470,6 +1470,14 @@ Apart from these, the following properties are also available, and may be useful if they are set (see below). </td> </tr> +<tr> + <td><code>spark.streaming.backpressure.initialRate</code></td> + <td>not set</td> + <td> + This is the initial maximum receiving rate at which each receiver will receive data for the + first batch when the backpressure mechanism is enabled. + </td> +</tr> <tr> <td><code>spark.streaming.blockInterval</code></td> <td>200ms</td> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index bca1fbc8fda2fa737163f2b1677fc2af393608aa..6a1b672220bd2e60f3e054d103e0890ec4c8dd62 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -36,7 +36,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { // treated as an upper limit private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) - private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble) + private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble) def waitToPush() { rateLimiter.acquire() @@ -61,4 +61,11 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { rateLimiter.setRate(newRate) } } + + /** + * Get the initial rateLimit to initial rateLimiter + */ + private def getInitialRateLimit(): Long = { + math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit) + } }