From f57e6c9effdb9e282fc8ae66dc30fe053fed5272 Mon Sep 17 00:00:00 2001
From: Shixiong Zhu <shixiong@databricks.com>
Date: Fri, 27 Nov 2015 11:50:18 -0800
Subject: [PATCH] [SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock
 in StreamingListenerSuite

In StreamingListenerSuite."don't call ssc.stop in listener", after the main thread calls `ssc.stop()`,  `StreamingContextStoppingCollector` may call  `ssc.stop()` in the listener bus thread, which is a dead-lock. This PR updated `StreamingContextStoppingCollector` to only call `ssc.stop()` in the first batch to avoid the dead-lock.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #10011 from zsxwing/fix-test-deadlock.
---
 .../streaming/StreamingListenerSuite.scala    | 25 ++++++++++++++-----
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index df4575ab25..04cd5bdc26 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
     val batchCounter = new BatchCounter(_ssc)
     _ssc.start()
     // Make sure running at least one batch
-    batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)
+    if (!batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)) {
+      fail("The first batch cannot complete in 10 seconds")
+    }
+    // When reaching here, we can make sure `StreamingContextStoppingCollector` won't call
+    // `ssc.stop()`, so it's safe to call `_ssc.stop()` now.
     _ssc.stop()
     assert(contextStoppingCollector.sparkExSeen)
   }
@@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener {
  */
 class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener {
   @volatile var sparkExSeen = false
+
+  private var isFirstBatch = true
+
   override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
-    try {
-      ssc.stop()
-    } catch {
-      case se: SparkException =>
-        sparkExSeen = true
+    if (isFirstBatch) {
+      // We should only call `ssc.stop()` in the first batch. Otherwise, it's possible that the main
+      // thread is calling `ssc.stop()`, while StreamingContextStoppingCollector is also calling
+      // `ssc.stop()` in the listener thread, which becomes a dead-lock.
+      isFirstBatch = false
+      try {
+        ssc.stop()
+      } catch {
+        case se: SparkException =>
+          sparkExSeen = true
+      }
     }
   }
 }
-- 
GitLab