diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala similarity index 86% rename from streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala rename to streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index eb6e88cf5520d7825b13a43f5586996d34903eb5..0f6a9489dbe0dc8cb6ab7f4f033226d11dde193b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -31,9 +31,9 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ /** Testsuite for testing the network receiver behavior */ -class NetworkReceiverSuite extends FunSuite with Timeouts { +class ReceiverSuite extends FunSuite with Timeouts { - test("network receiver life cycle") { + test("receiver life cycle") { val receiver = new FakeReceiver val executor = new FakeReceiverSupervisor(receiver) @@ -152,8 +152,8 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { test("block generator throttling") { val blockGeneratorListener = new FakeBlockGeneratorListener - val blockInterval = 50 - val maxRate = 200 + val blockInterval = 100 + val maxRate = 100 val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString). set("spark.streaming.receiver.maxRate", maxRate.toString) val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf) @@ -175,19 +175,35 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { } blockGenerator.stop() - val recordedData = blockGeneratorListener.arrayBuffers - assert(blockGeneratorListener.arrayBuffers.size > 0) - assert(recordedData.flatten.toSet === generatedData.toSet) + val recordedBlocks = blockGeneratorListener.arrayBuffers + val recordedData = recordedBlocks.flatten + assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received") + assert(recordedData.toSet === generatedData.toSet, "Received data not same") + // recordedData size should be close to the expected rate - assert(recordedData.flatten.size >= expectedMessages * 0.9 && - recordedData.flatten.size <= expectedMessages * 1.1 ) - // the first and last block may be incomplete, so we slice them out - recordedData.slice(1, recordedData.size - 1).foreach { block => - assert(block.size >= expectedMessagesPerBlock * 0.8 && - block.size <= expectedMessagesPerBlock * 1.2 ) - } + val minExpectedMessages = expectedMessages - 3 + val maxExpectedMessages = expectedMessages + 1 + val numMessages = recordedData.size + assert( + numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages, + s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages" + ) + + val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3 + val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1 + val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",") + println(minExpectedMessagesPerBlock, maxExpectedMessagesPerBlock, ":", receivedBlockSizes) + assert( + // the first and last block may be incomplete, so we slice them out + recordedBlocks.drop(1).dropRight(1).forall { block => + block.size >= minExpectedMessagesPerBlock && block.size <= maxExpectedMessagesPerBlock + }, + s"# records in received blocks = [$receivedBlockSizes], not between " + + s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock" + ) } + /** * An implementation of NetworkReceiver that is used for testing a receiver's life cycle. */