Skip to content
Snippets Groups Projects
Commit bde85f8b authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-17649][CORE] Log how many Spark events got dropped in LiveListenerBus

## What changes were proposed in this pull request?

Log how many Spark events got dropped in LiveListenerBus so that the user can get insights on how to set a correct event queue size.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15220 from zsxwing/SPARK-17649.
parent f234b7cd
No related branches found
No related tags found
No related merge requests found
......@@ -18,7 +18,7 @@
package org.apache.spark.scheduler
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.util.DynamicVariable
......@@ -57,6 +57,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)
/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
......@@ -123,6 +129,24 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}
val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment