Skip to content
Snippets Groups Projects
Commit ee6f7e22 authored by Andrew Or's avatar Andrew Or Committed by Patrick Wendell
Browse files

[SPARK-1615] Synchronize accesses to the LiveListenerBus' event queue

Original poster is @zsxwing, who reported this bug in #516.

Much of SparkListenerSuite relies on LiveListenerBus's `waitUntilEmpty()` method. As the name suggests, this waits until the event queue is empty. However, the following race condition could happen:

(1) We dequeue an event
(2) The queue is empty, we return true (even though the event has not been processed)
(3) The test asserts something assuming that all listeners have finished executing (and fails)
(4) The listeners receive and process the event

This PR makes (1) and (4) atomic by synchronizing around it. To do that, however, we must avoid using `eventQueue.take`, which is blocking and will cause a deadlock if we synchronize around it. As a workaround, we use the non-blocking `eventQueue.poll` + a semaphore to provide the same semantics.

This has been a possible race condition for a long time, but for some reason we've never run into it.

Author: Andrew Or <andrewor14@gmail.com>

Closes #544 from andrewor14/stage-info-test-fix and squashes the following commits:

3cbe40c [Andrew Or] Merge github.com:apache/spark into stage-info-test-fix
56dbbcb [Andrew Or] Check if event is actually added before releasing semaphore
eb486ae [Andrew Or] Synchronize accesses to the LiveListenerBus' event queue
parent 80429f3e
No related branches found
No related tags found
No related merge requests found
......@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.{LinkedBlockingQueue, Semaphore}
import org.apache.spark.Logging
......@@ -36,16 +36,24 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false
// A counter that represents the number of events produced and consumed in the queue
private val eventLock = new Semaphore(0)
private val listenerThread = new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
while (true) {
val event = eventQueue.take
if (event == SparkListenerShutdown) {
// Get out of the while loop and shutdown the daemon thread
return
eventLock.acquire()
// Atomically remove and process this event
LiveListenerBus.this.synchronized {
val event = eventQueue.poll
if (event == SparkListenerShutdown) {
// Get out of the while loop and shutdown the daemon thread
return
}
Option(event).foreach(postToAll)
}
postToAll(event)
}
}
}
......@@ -67,7 +75,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
def post(event: SparkListenerEvent) {
val eventAdded = eventQueue.offer(event)
if (!eventAdded && !queueFullErrorMessageLogged) {
if (eventAdded) {
eventLock.release()
} else if (!queueFullErrorMessageLogged) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
"rate at which tasks are being started by the scheduler.")
......@@ -76,13 +86,13 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}
/**
* Waits until there are no more events in the queue, or until the specified time has elapsed.
* Used for testing only. Returns true if the queue has emptied and false is the specified time
* For testing only. Wait until there are no more events in the queue, or until the specified
* time has elapsed. Return true if the queue has emptied and false is the specified time
* elapsed before the queue emptied.
*/
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!eventQueue.isEmpty) {
while (!queueIsEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
}
......@@ -93,6 +103,14 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
true
}
/**
* Return whether the event queue is empty.
*
* The use of synchronized here guarantees that all events that once belonged to this queue
* have already been processed by all attached listeners, if this returns true.
*/
def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
def stop() {
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
......
......@@ -29,7 +29,8 @@ import org.apache.spark.SparkContext._
import org.apache.spark.executor.TaskMetrics
class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
with BeforeAndAfter with BeforeAndAfterAll {
with BeforeAndAfter with BeforeAndAfterAll {
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
......@@ -37,7 +38,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
sc = new SparkContext("local", "SparkListenerSuite")
}
override def afterAll {
override def afterAll() {
System.clearProperty("spark.akka.frameSize")
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment