From 1234258077b1f4050845e9fb73066b37f981c72a Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Wed, 29 Oct 2014 17:59:16 -0700
Subject: [PATCH] [SPARK-4053][Streaming] Made the ReceiverSuite test more
 reliable, by fixing block generator throttling

In the unit test that checked whether blocks generated by throttled block generator had expected number of records, the thresholds are too tight, which sometimes led to the test failing.
This PR fixes it by relaxing the thresholds and the time intervals for testing.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #2900 from tdas/receiver-suite-flakiness and squashes the following commits:

28508a2 [Tathagata Das] Made the ReceiverSuite test more reliable
---
 ...eceiverSuite.scala => ReceiverSuite.scala} | 44 +++++++++++++------
 1 file changed, 30 insertions(+), 14 deletions(-)
 rename streaming/src/test/scala/org/apache/spark/streaming/{NetworkReceiverSuite.scala => ReceiverSuite.scala} (86%)

diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
similarity index 86%
rename from streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
rename to streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index eb6e88cf55..0f6a9489db 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -31,9 +31,9 @@ import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
 /** Testsuite for testing the network receiver behavior */
-class NetworkReceiverSuite extends FunSuite with Timeouts {
+class ReceiverSuite extends FunSuite with Timeouts {
 
-  test("network receiver life cycle") {
+  test("receiver life cycle") {
 
     val receiver = new FakeReceiver
     val executor = new FakeReceiverSupervisor(receiver)
@@ -152,8 +152,8 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
 
   test("block generator throttling") {
     val blockGeneratorListener = new FakeBlockGeneratorListener
-    val blockInterval = 50
-    val maxRate = 200
+    val blockInterval = 100
+    val maxRate = 100
     val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
       set("spark.streaming.receiver.maxRate", maxRate.toString)
     val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
@@ -175,19 +175,35 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
     }
     blockGenerator.stop()
 
-    val recordedData = blockGeneratorListener.arrayBuffers
-    assert(blockGeneratorListener.arrayBuffers.size > 0)
-    assert(recordedData.flatten.toSet === generatedData.toSet)
+    val recordedBlocks = blockGeneratorListener.arrayBuffers
+    val recordedData = recordedBlocks.flatten
+    assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
+    assert(recordedData.toSet === generatedData.toSet, "Received data not same")
+
     // recordedData size should be close to the expected rate
-    assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
-      recordedData.flatten.size <= expectedMessages * 1.1 )
-    // the first and last block may be incomplete, so we slice them out
-    recordedData.slice(1, recordedData.size - 1).foreach { block =>
-      assert(block.size >= expectedMessagesPerBlock * 0.8 &&
-        block.size <= expectedMessagesPerBlock * 1.2 )
-    }
+    val minExpectedMessages = expectedMessages - 3
+    val maxExpectedMessages = expectedMessages + 1
+    val numMessages = recordedData.size
+    assert(
+      numMessages >= minExpectedMessages && numMessages <= maxExpectedMessages,
+      s"#records received = $numMessages, not between $minExpectedMessages and $maxExpectedMessages"
+    )
+
+    val minExpectedMessagesPerBlock = expectedMessagesPerBlock - 3
+    val maxExpectedMessagesPerBlock = expectedMessagesPerBlock + 1
+    val receivedBlockSizes = recordedBlocks.map { _.size }.mkString(",")
+    println(minExpectedMessagesPerBlock, maxExpectedMessagesPerBlock, ":", receivedBlockSizes)
+    assert(
+      // the first and last block may be incomplete, so we slice them out
+      recordedBlocks.drop(1).dropRight(1).forall { block =>
+        block.size >= minExpectedMessagesPerBlock && block.size <= maxExpectedMessagesPerBlock
+      },
+      s"# records in received blocks = [$receivedBlockSizes], not between " +
+        s"$minExpectedMessagesPerBlock and $maxExpectedMessagesPerBlock"
+    )
   }
 
+
   /**
    * An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
    */
-- 
GitLab