From 086b0c8f6788b205bc630d5ccf078f77b9751af3 Mon Sep 17 00:00:00 2001
From: Shixiong Zhu <shixiong@databricks.com>
Date: Thu, 1 Dec 2016 14:22:49 -0800
Subject: [PATCH] [SPARK-18617][SPARK-18560][TESTS] Fix flaky test:
 StreamingContextSuite. Receiver data should be deserialized properly

## What changes were proposed in this pull request?

Avoid to create multiple threads to stop StreamingContext. Otherwise, the latch added in #16091 can be passed too early.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16105 from zsxwing/SPARK-18617-2.
---
 .../org/apache/spark/streaming/StreamingContextSuite.scala    | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 35eeb9dfa5..5645996de5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -814,10 +814,12 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
     ssc = new StreamingContext(conf, Milliseconds(100))
     val input = ssc.receiverStream(new TestReceiver)
     val latch = new CountDownLatch(1)
+    @volatile var stopping = false
     input.count().foreachRDD { rdd =>
       // Make sure we can read from BlockRDD
-      if (rdd.collect().headOption.getOrElse(0L) > 0) {
+      if (rdd.collect().headOption.getOrElse(0L) > 0 && !stopping) {
         // Stop StreamingContext to unblock "awaitTerminationOrTimeout"
+        stopping = true
         new Thread() {
           setDaemon(true)
           override def run(): Unit = {
-- 
GitLab