Skip to content
Snippets Groups Projects
Commit 85c5424d authored by CodingCat's avatar CodingCat Committed by Shixiong Zhu
Browse files

[SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent

## What changes were proposed in this pull request?

The PR fixes the bug that the QueryStartedEvent is not logged

the postToAll() in the original code is actually calling StreamingQueryListenerBus.postToAll() which has no listener at all....we shall post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local listeners as well as the listeners registered in LiveListenerBus

zsxwing
## How was this patch tested?

The following snapshot shows that QueryStartedEvent has been logged correctly

![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png)

Author: CodingCat <zhunansjtu@gmail.com>

Closes #15675 from CodingCat/SPARK-18144.
parent a36653c5
No related branches found
No related tags found
No related merge requests found
......@@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
def post(event: StreamingQueryListener.Event) {
event match {
case s: QueryStartedEvent =>
sparkListenerBus.post(s)
// post to local listeners to trigger callbacks
postToAll(s)
case _ =>
sparkListenerBus.post(event)
......@@ -50,7 +52,13 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: StreamingQueryListener.Event =>
postToAll(e)
// SPARK-18144: we broadcast QueryStartedEvent to all listeners attached to this bus
// synchronously and the ones attached to LiveListenerBus asynchronously. Therefore,
// we need to ignore QueryStartedEvent if this method is called within SparkListenerBus
// thread
if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) {
postToAll(e)
}
case _ =>
}
}
......
......@@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
// A StreamingQueryListener that gets the query status after the first completed trigger
val listener = new StreamingQueryListener {
@volatile var firstStatus: StreamingQueryStatus = null
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
@volatile var queryStartedEvent = 0
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
queryStartedEvent += 1
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
if (firstStatus == null) firstStatus = queryProgress.queryStatus
}
......@@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
q.processAllAvailable()
eventually(timeout(streamingTimeout)) {
assert(listener.firstStatus != null)
// test if QueryStartedEvent callback is called for only once
assert(listener.queryStartedEvent === 1)
}
listener.firstStatus
} finally {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment