From 38877ccf394a50bfd37c8433d4aafaa91683d3b8 Mon Sep 17 00:00:00 2001
From: Kan Zhang <kzhang@apache.org>
Date: Wed, 16 Apr 2014 17:39:11 -0700
Subject: [PATCH] Fixing a race condition in event listener unit test

Author: Kan Zhang <kzhang@apache.org>

Closes #401 from kanzhang/fix-1475 and squashes the following commits:

c6058bd [Kan Zhang] Fixing a race condition in event listener unit test
---
 .../spark/scheduler/LiveListenerBus.scala     |  4 ---
 .../spark/scheduler/SparkListenerSuite.scala  | 28 +++++++++++++------
 2 files changed, 19 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 545fa453b7..cbac4c13ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -50,9 +50,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
     }
   }
 
-  // Exposed for testing
-  @volatile private[spark] var stopCalled = false
-
   /**
    * Start sending events to attached listeners.
    *
@@ -97,7 +94,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
   }
 
   def stop() {
-    stopCalled = true
     if (!started) {
       throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
     }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 4cdccdda6f..36511a9e95 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -77,14 +77,21 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
   test("bus.stop() waits for the event queue to completely drain") {
     @volatile var drained = false
 
+    // When Listener has started
+    val listenerStarted = new Semaphore(0)
+
     // Tells the listener to stop blocking
-    val listenerWait = new Semaphore(1)
+    val listenerWait = new Semaphore(0)
+
+    // When stopper has started
+    val stopperStarted = new Semaphore(0)
 
-    // When stop has returned
-    val stopReturned = new Semaphore(1)
+    // When stopper has returned
+    val stopperReturned = new Semaphore(0)
 
     class BlockingListener extends SparkListener {
       override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
+        listenerStarted.release()
         listenerWait.acquire()
         drained = true
       }
@@ -97,23 +104,26 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
     bus.start()
     bus.post(SparkListenerJobEnd(0, JobSucceeded))
 
-    // the queue should not drain immediately
+    listenerStarted.acquire()
+    // Listener should be blocked after start
     assert(!drained)
 
     new Thread("ListenerBusStopper") {
       override def run() {
+        stopperStarted.release()
         // stop() will block until notify() is called below
         bus.stop()
-        stopReturned.release(1)
+        stopperReturned.release()
       }
     }.start()
 
-    while (!bus.stopCalled) {
-      Thread.sleep(10)
-    }
+    stopperStarted.acquire()
+    // Listener should remain blocked after stopper started
+    assert(!drained)
 
+    // unblock Listener to let queue drain
     listenerWait.release()
-    stopReturned.acquire()
+    stopperReturned.acquire()
     assert(drained)
   }
 
-- 
GitLab