diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index fc2190d39da4f6288d7582746cac3c2b50f75738..22e4c6380fcd519296ff2651518445d237365917 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
   def post(event: StreamingQueryListener.Event) {
     event match {
       case s: QueryStartedEvent =>
+        sparkListenerBus.post(s)
+        // post to local listeners to trigger callbacks
         postToAll(s)
       case _ =>
         sparkListenerBus.post(event)
@@ -50,7 +52,13 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
     event match {
       case e: StreamingQueryListener.Event =>
-        postToAll(e)
+        // SPARK-18144: we broadcast QueryStartedEvent to all listeners attached to this bus
+        // synchronously and the ones attached to LiveListenerBus asynchronously. Therefore,
+        // we need to ignore QueryStartedEvent if this method is called within SparkListenerBus
+        // thread
+        if (!LiveListenerBus.withinListenerThread.value || !e.isInstanceOf[QueryStartedEvent]) {
+          postToAll(e)
+        }
       case _ =>
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 464c443beb6e75bc4fb87b046bbddd905574632f..31b7fe0b04da95fe6e4258b040df16b33e0503ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     // A StreamingQueryListener that gets the query status after the first completed trigger
     val listener = new StreamingQueryListener {
       @volatile var firstStatus: StreamingQueryStatus = null
-      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
+      @volatile var queryStartedEvent = 0
+      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+        queryStartedEvent += 1
+      }
       override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        if (firstStatus == null) firstStatus = queryProgress.queryStatus
       }
@@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       q.processAllAvailable()
       eventually(timeout(streamingTimeout)) {
         assert(listener.firstStatus != null)
+        // test if QueryStartedEvent callback is called for only once
+        assert(listener.queryStartedEvent === 1)
       }
       listener.firstStatus
     } finally {