Skip to content
Snippets Groups Projects
Commit 12342580 authored by Tathagata Das's avatar Tathagata Das Committed by Andrew Or
Browse files

[SPARK-4053][Streaming] Made the ReceiverSuite test more reliable, by fixing...

[SPARK-4053][Streaming] Made the ReceiverSuite test more reliable, by fixing block generator throttling

In the unit test that checked whether blocks generated by throttled block generator had expected number of records, the thresholds are too tight, which sometimes led to the test failing.
This PR fixes it by relaxing the thresholds and the time intervals for testing.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #2900 from tdas/receiver-suite-flakiness and squashes the following commits:

28508a2 [Tathagata Das] Made the ReceiverSuite test more reliable
parent 8d59b37b
No related branches found
No related tags found
No related merge requests found
......@@ -31,9 +31,9 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
/** Testsuite for testing the network receiver behavior */
class NetworkReceiverSuite extends FunSuite with Timeouts {
class ReceiverSuite extends FunSuite with Timeouts {
test("network receiver life cycle") {
test("receiver life cycle") {
val receiver = new FakeReceiver
val executor = new FakeReceiverSupervisor(receiver)
......@@ -152,8 +152,8 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
test("block generator throttling") {
val blockGeneratorListener = new FakeBlockGeneratorListener
val blockInterval = 50
val maxRate = 200
val blockInterval = 100
val maxRate = 100
val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
set("spark.streaming.receiver.maxRate", maxRate.toString)
val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
......@@ -175,19 +175,35 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
}
blockGenerator.stop()
val recordedData = blockGeneratorListener.arrayBuffers
assert(blockGeneratorListener.arrayBuffers.size > 0)
assert(recordedData.flatten.toSet === generatedData.toSet)
val recordedBlocks = blockGeneratorListener.arrayBuffers
val recordedData = recordedBlocks.flatten
assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
assert(recordedData.toSet === generatedData.toSet, "Received data not same")
// recordedData size should be close to the expected rate
assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
recordedData.flatten.size <= expectedMessages * 1.1 )
// the first and last block may be incomplete, so we slice them out
recordedData.slice(1, recordedData.size - 1).foreach { block =>
assert(block.size >= expectedMessagesPerBlock * 0.8 &&
block.size <= expectedMessagesPerBlock * 1.2 )
}
val minExpectedMessages = expectedMessages - 3
val maxExpectedMessages = expectedMessages + 1
val numMessages = recordedData.size
assert(
numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
)
val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3
val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1
val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
println(minExpectedMessagesPerBlock, maxExpectedMessagesPerBlock, ":", receivedBlockSizes)
assert(
// the first and last block may be incomplete, so we slice them out
recordedBlocks.drop(1).dropRight(1).forall { block =>
block.size >= minExpectedMessagesPerBlock && block.size <= maxExpectedMessagesPerBlock
},
s"# records in received blocks = [$receivedBlockSizes], not between " +
s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock"
)
}
/**
* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
*/
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment