Skip to content
Snippets Groups Projects
Commit 9f8e3921 authored by Eyal Zituny's avatar Eyal Zituny Committed by Shixiong Zhu
Browse files

[SPARK-19594][STRUCTURED STREAMING] StreamingQueryListener fails to handle...

[SPARK-19594][STRUCTURED STREAMING] StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists

## What changes were proposed in this pull request?

currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered, only one of the listeners will be invoked while the rest of the listeners will ignore the event.
this is caused since the the streaming queries listeners bus holds a set of running queries ids and when a termination event is triggered, after the first listeners is handling the event, the terminated query id is being removed from the set.
in this PR, the query id will be removed from the set only after all the listeners handles the event

## How was this patch tested?

a test with multiple listeners has been added to StreamingQueryListenerSuite

Author: Eyal Zituny <eyal.zituny@equalum.io>

Closes #16991 from eyalzit/master.
parent 68f2142c
No related branches found
No related tags found
No related merge requests found
......@@ -52,7 +52,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*/
final def postToAll(event: E): Unit = {
def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
......
......@@ -75,6 +75,19 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
}
}
/**
* Override the parent `postToAll` to remove the query id from `activeQueryRunIds` after all
* the listeners process `QueryTerminatedEvent`. (SPARK-19594)
*/
override def postToAll(event: Event): Unit = {
super.postToAll(event)
event match {
case t: QueryTerminatedEvent =>
activeQueryRunIds.synchronized { activeQueryRunIds -= t.runId }
case _ =>
}
}
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: StreamingQueryListener.Event =>
......@@ -112,7 +125,6 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
case queryTerminated: QueryTerminatedEvent =>
if (shouldReport(queryTerminated.runId)) {
listener.onQueryTerminated(queryTerminated)
activeQueryRunIds.synchronized { activeQueryRunIds -= queryTerminated.runId }
}
case _ =>
}
......
......@@ -133,6 +133,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}
test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") {
val df = MemoryStream[Int].toDS().as[Long]
val listeners = (1 to 5).map(_ => new EventCollector)
try {
listeners.foreach(listener => spark.streams.addListener(listener))
testStream(df, OutputMode.Append)(
StartStream(),
StopStream,
AssertOnQuery { query =>
eventually(Timeout(streamingTimeout)) {
listeners.foreach(listener => assert(listener.terminationEvent !== null))
listeners.foreach(listener => assert(listener.terminationEvent.id === query.id))
listeners.foreach(listener => assert(listener.terminationEvent.runId === query.runId))
listeners.foreach(listener => assert(listener.terminationEvent.exception === None))
}
listeners.foreach(listener => listener.checkAsyncErrors())
listeners.foreach(listener => listener.reset())
true
}
)
} finally {
listeners.foreach(spark.streams.removeListener)
}
}
test("adding and removing listener") {
def isListenerActive(listener: EventCollector): Boolean = {
listener.reset()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment