Skip to content
Snippets Groups Projects
Commit 5fea3e5c authored by David McGuire's avatar David McGuire Committed by Sean Owen
Browse files

[SPARK-6985][streaming] Receiver maxRate over 1000 causes a StackOverflowError

A simple truncation in integer division (on rates over 1000 messages / second) causes the existing implementation to sleep for 0 milliseconds, then call itself recursively; this causes what is essentially an infinite recursion, since the base case of the calculated amount of time having elapsed can't be reached before available stack space is exhausted. A fix to this truncation error is included in this patch.

However, even with the defect patched, the accuracy of the existing implementation is abysmal (the error bounds of the original test were effectively [-30%, +10%], although this fact was obscured by hard-coded error margins); as such, when the error bounds were tightened down to [-5%, +5%], the existing implementation failed to meet the new, tightened, requirements. Therefore, an industry-vetted solution (from Guava) was used to get the adapted tests to pass.

Author: David McGuire <david.mcguire2@nike.com>

Closes #5559 from dmcguire81/master and squashes the following commits:

d29d2e0 [David McGuire] Back out to +/-5% error margins, for flexibility in timing
8be6934 [David McGuire] Fix spacing per code review
90e98b9 [David McGuire] Address scalastyle errors
29011bd [David McGuire] Further ratchet down the error margins
b33b796 [David McGuire] Eliminate dependency on even distribution by BlockGenerator
8f2934b [David McGuire] Remove arbitrary thread timing / cooperation code
70ee310 [David McGuire] Use Thread.yield(), since Thread.sleep(0) is system-dependent
82ee46d [David McGuire] Replace guard clause with nested conditional
2794717 [David McGuire] Replace the RateLimiter with the Guava implementation
38f3ca8 [David McGuire] Ratchet down the error rate to +/- 5%; tests fail
24b1bc0 [David McGuire] Fix truncation in integer division causing infinite recursion
d6e1079 [David McGuire] Stack overflow error in RateLimiter on rates over 1000/s
parent 1f2f723b
No related branches found
No related tags found
No related merge requests found
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.apache.spark.streaming.receiver package org.apache.spark.streaming.receiver
import org.apache.spark.{Logging, SparkConf} import org.apache.spark.{Logging, SparkConf}
import java.util.concurrent.TimeUnit._ import com.google.common.util.concurrent.{RateLimiter=>GuavaRateLimiter}
/** Provides waitToPush() method to limit the rate at which receivers consume data. /** Provides waitToPush() method to limit the rate at which receivers consume data.
* *
...@@ -33,37 +33,12 @@ import java.util.concurrent.TimeUnit._ ...@@ -33,37 +33,12 @@ import java.util.concurrent.TimeUnit._
*/ */
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
private var lastSyncTime = System.nanoTime
private var messagesWrittenSinceSync = 0L
private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0) private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS) private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate)
def waitToPush() { def waitToPush() {
if( desiredRate <= 0 ) { if (desiredRate > 0) {
return rateLimiter.acquire()
}
val now = System.nanoTime
val elapsedNanosecs = math.max(now - lastSyncTime, 1)
val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
if (rate < desiredRate) {
// It's okay to write; just update some variables and return
messagesWrittenSinceSync += 1
if (now > lastSyncTime + SYNC_INTERVAL) {
// Sync interval has passed; let's resync
lastSyncTime = now
messagesWrittenSinceSync = 1
}
} else {
// Calculate how much time we should sleep to bring ourselves to the desired rate.
val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate
val elapsedTimeInMillis = elapsedNanosecs / 1000000
val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
if (sleepTimeInMillis > 0) {
logTrace("Natural rate is " + rate + " per second but desired rate is " +
desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.")
Thread.sleep(sleepTimeInMillis)
}
waitToPush()
} }
} }
} }
...@@ -158,7 +158,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { ...@@ -158,7 +158,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
test("block generator throttling") { test("block generator throttling") {
val blockGeneratorListener = new FakeBlockGeneratorListener val blockGeneratorListener = new FakeBlockGeneratorListener
val blockIntervalMs = 100 val blockIntervalMs = 100
val maxRate = 100 val maxRate = 1001
val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms"). val conf = new SparkConf().set("spark.streaming.blockInterval", s"${blockIntervalMs}ms").
set("spark.streaming.receiver.maxRate", maxRate.toString) set("spark.streaming.receiver.maxRate", maxRate.toString)
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
...@@ -176,7 +176,6 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { ...@@ -176,7 +176,6 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
blockGenerator.addData(count) blockGenerator.addData(count)
generatedData += count generatedData += count
count += 1 count += 1
Thread.sleep(1)
} }
blockGenerator.stop() blockGenerator.stop()
...@@ -185,25 +184,31 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { ...@@ -185,25 +184,31 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
assert(recordedData.toSet === generatedData.toSet, "Received data not same") assert(recordedData.toSet === generatedData.toSet, "Received data not same")
// recordedData size should be close to the expected rate // recordedData size should be close to the expected rate; use an error margin proportional to
val minExpectedMessages = expectedMessages - 3 // the value, so that rate changes don't cause a brittle test
val maxExpectedMessages = expectedMessages + 1 val minExpectedMessages = expectedMessages - 0.05 * expectedMessages
val maxExpectedMessages = expectedMessages + 0.05 * expectedMessages
val numMessages = recordedData.size val numMessages = recordedData.size
assert( assert(
numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
) )
val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3 // XXX Checking every block would require an even distribution of messages across blocks,
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1 // which throttling code does not control. Therefore, test against the average.
val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 0.05 * expectedMessagesPerBlock
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 0.05 * expectedMessagesPerBlock
val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
// the first and last block may be incomplete, so we slice them out
val validBlocks = recordedBlocks.drop(1).dropRight(1)
val averageBlockSize = validBlocks.map(block => block.size).sum / validBlocks.size
assert( assert(
// the first and last block may be incomplete, so we slice them out averageBlockSize >= minExpectedMessagesPerBlock &&
recordedBlocks.drop(1).dropRight(1).forall { block => averageBlockSize <= maxExpectedMessagesPerBlock,
block.size >= minExpectedMessagesPerBlock && block.size <= maxExpectedMessagesPerBlock
},
s"# records in received blocks = [$receivedBlockSizes], not between " + s"# records in received blocks = [$receivedBlockSizes], not between " +
s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock" s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock, on average"
) )
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment