Skip to content
Snippets Groups Projects
Commit 3e0a6cf1 authored by tedyu's avatar tedyu Committed by Andrew Or
Browse files

[SPARK-11572] Exit AsynchronousListenerBus thread when stop() is called

As vonnagy reported in the following thread:
http://search-hadoop.com/m/q3RTtk982kvIow22

Attempts to join the thread in AsynchronousListenerBus resulted in lock up because AsynchronousListenerBus thread was still getting messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler

Author: tedyu <yuzhihong@gmail.com>

Closes #9546 from ted-yu/master.
parent 33112f9c
No related branches found
No related tags found
No related merge requests found
...@@ -67,15 +67,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri ...@@ -67,15 +67,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
processingEvent = true processingEvent = true
} }
try { try {
val event = eventQueue.poll if (stopped.get()) {
if (event == null) {
// Get out of the while loop and shutdown the daemon thread // Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return return
} }
val event = eventQueue.poll
assert(event != null, "event queue was empty but the listener bus was not stopped")
postToAll(event) postToAll(event)
} finally { } finally {
self.synchronized { self.synchronized {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment