Skip to content
Snippets Groups Projects
Commit b80ec056 authored by Tathagata Das's avatar Tathagata Das
Browse files

Added StatsReportListener to generate processing time statistics across multiple batches.

parent 097e120c
No related branches found
No related tags found
No related merge requests found
...@@ -131,8 +131,8 @@ object StatsReportListener extends Logging { ...@@ -131,8 +131,8 @@ object StatsReportListener extends Logging {
def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) { def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
val stats = d.statCounter val stats = d.statCounter
logInfo(heading + stats)
val quantiles = d.getQuantiles(probabilities).map{formatNumber} val quantiles = d.getQuantiles(probabilities).map{formatNumber}
logInfo(heading + stats)
logInfo(percentilesHeader) logInfo(percentilesHeader)
logInfo("\t" + quantiles.mkString("\t")) logInfo("\t" + quantiles.mkString("\t"))
} }
...@@ -173,8 +173,6 @@ object StatsReportListener extends Logging { ...@@ -173,8 +173,6 @@ object StatsReportListener extends Logging {
showMillisDistribution(heading, extractLongDistribution(stage, getMetric)) showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
} }
val seconds = 1000L val seconds = 1000L
val minutes = seconds * 60 val minutes = seconds * 60
val hours = minutes * 60 val hours = minutes * 60
...@@ -198,7 +196,6 @@ object StatsReportListener extends Logging { ...@@ -198,7 +196,6 @@ object StatsReportListener extends Logging {
} }
case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double) case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
object RuntimePercentage { object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = { def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
......
...@@ -79,13 +79,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { ...@@ -79,13 +79,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
jobSet.afterJobStop(job) jobSet.afterJobStop(job)
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) { if (jobSet.hasCompleted) {
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo()))
jobSets.remove(jobSet.time) jobSets.remove(jobSet.time)
generator.onBatchCompletion(jobSet.time) generator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0 jobSet.processingDelay / 1000.0
)) ))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo()))
} }
} }
......
...@@ -17,14 +17,22 @@ ...@@ -17,14 +17,22 @@
package org.apache.spark.streaming.scheduler package org.apache.spark.streaming.scheduler
import scala.collection.mutable.Queue
import org.apache.spark.util.Distribution
/** Base trait for events related to StreamingListener */
sealed trait StreamingListenerEvent sealed trait StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
trait StreamingListener {
/**
* A listener interface for receiving information about an ongoing streaming
* computation.
*/
trait StreamingListener {
/** /**
* Called when processing of a batch has completed * Called when processing of a batch has completed
*/ */
...@@ -34,4 +42,39 @@ trait StreamingListener { ...@@ -34,4 +42,39 @@ trait StreamingListener {
* Called when processing of a batch has started * Called when processing of a batch has started
*/ */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
}
/**
* A simple StreamingListener that logs summary statistics across Spark Streaming batches
* @param numBatchInfos Number of last batches to consider for generating statistics (default: 10)
*/
class StatsReportListener(numBatchInfos: Int = 10) extends StreamingListener {
import org.apache.spark
val batchInfos = new Queue[BatchInfo]()
override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) {
addToQueue(batchStarted.batchInfo)
printStats()
}
def addToQueue(newPoint: BatchInfo) {
batchInfos.enqueue(newPoint)
if (batchInfos.size > numBatchInfos) batchInfos.dequeue()
}
def printStats() {
showMillisDistribution("Total delay: ", _.totalDelay)
showMillisDistribution("Processing time: ", _.processingDelay)
}
def showMillisDistribution(heading: String, getMetric: BatchInfo => Option[Long]) {
spark.scheduler.StatsReportListener.showMillisDistribution(heading, extractDistribution(getMetric))
}
def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble))
}
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment