From 3e0a6cf1e02a19b37c68d3026415d53bb57a576b Mon Sep 17 00:00:00 2001
From: tedyu <yuzhihong@gmail.com>
Date: Tue, 10 Nov 2015 16:51:25 -0800
Subject: [PATCH] [SPARK-11572] Exit AsynchronousListenerBus thread when stop()
 is called

As vonnagy reported in the following thread:
http://search-hadoop.com/m/q3RTtk982kvIow22

Attempts to join the thread in AsynchronousListenerBus resulted in lock up because AsynchronousListenerBus thread was still getting messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler

Author: tedyu <yuzhihong@gmail.com>

Closes #9546 from ted-yu/master.
---
 .../org/apache/spark/util/AsynchronousListenerBus.scala  | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
index 61b5a4cecd..b8481eabc7 100644
--- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -67,15 +67,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
           processingEvent = true
         }
         try {
-          val event = eventQueue.poll
-          if (event == null) {
+          if (stopped.get()) {
             // Get out of the while loop and shutdown the daemon thread
-            if (!stopped.get) {
-              throw new IllegalStateException("Polling `null` from eventQueue means" +
-                " the listener bus has been stopped. So `stopped` must be true")
-            }
             return
           }
+          val event = eventQueue.poll
+          assert(event != null, "event queue was empty but the listener bus was not stopped")
           postToAll(event)
         } finally {
           self.synchronized {
-- 
GitLab