diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 35eeb9dfa5ef8c41197f1bbe1b8c2a181b92ab5b..5645996de5a6975a8052f1295f4bb37ba031f609 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -814,10 +814,12 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ssc = new StreamingContext(conf, Milliseconds(100)) val input = ssc.receiverStream(new TestReceiver) val latch = new CountDownLatch(1) + @volatile var stopping = false input.count().foreachRDD { rdd => // Make sure we can read from BlockRDD - if (rdd.collect().headOption.getOrElse(0L) > 0) { + if (rdd.collect().headOption.getOrElse(0L) > 0 && !stopping) { // Stop StreamingContext to unblock "awaitTerminationOrTimeout" + stopping = true new Thread() { setDaemon(true) override def run(): Unit = {