Skip to content
Snippets Groups Projects
Commit be7c5ff1 authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-10900] [STREAMING] Add output operation events to StreamingListener

Add output operation events to StreamingListener so as to implement the following UI features:

1. Progress bar of a batch in the batch list.
2. Be able to display output operation `description` and `duration` when there is no spark job in a Streaming job.

Author: zsxwing <zsxwing@gmail.com>

Closes #8958 from zsxwing/output-operation-events.
parent a609eb20
No related branches found
No related tags found
No related merge requests found
......@@ -111,7 +111,11 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite.longForm))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
......
......@@ -29,6 +29,7 @@ class Job(val time: Time, func: () => _) {
private var _outputOpId: Int = _
private var isSet = false
private var _result: Try[_] = null
private var _callSite: String = "Unknown"
def run() {
_result = Try(func())
......@@ -70,5 +71,11 @@ class Job(val time: Time, func: () => _) {
_outputOpId = outputOpId
}
def setCallSite(callSite: String): Unit = {
_callSite = callSite
}
def callSite: String = _callSite
override def toString: String = id
}
......@@ -30,8 +30,8 @@ import org.apache.spark.util.{EventLoop, ThreadUtils}
private[scheduler] sealed trait JobSchedulerEvent
private[scheduler] case class JobStarted(job: Job) extends JobSchedulerEvent
private[scheduler] case class JobCompleted(job: Job) extends JobSchedulerEvent
private[scheduler] case class JobStarted(job: Job, startTime: Long) extends JobSchedulerEvent
private[scheduler] case class JobCompleted(job: Job, completedTime: Long) extends JobSchedulerEvent
private[scheduler] case class ErrorReported(msg: String, e: Throwable) extends JobSchedulerEvent
/**
......@@ -143,8 +143,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private def processEvent(event: JobSchedulerEvent) {
try {
event match {
case JobStarted(job) => handleJobStart(job)
case JobCompleted(job) => handleJobCompletion(job)
case JobStarted(job, startTime) => handleJobStart(job, startTime)
case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
case ErrorReported(m, e) => handleError(m, e)
}
} catch {
......@@ -153,7 +153,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
}
private def handleJobStart(job: Job) {
private def handleJobStart(job: Job, startTime: Long) {
val jobSet = jobSets.get(job.time)
val isFirstJobOfJobSet = !jobSet.hasStarted
jobSet.handleJobStart(job)
......@@ -162,12 +162,16 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// correct "jobSet.processingStartTime".
listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
}
listenerBus.post(StreamingListenerOutputOperationStarted(
OutputOperationInfo(job.time, job.outputOpId, job.callSite, Some(startTime), None)))
logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
}
private def handleJobCompletion(job: Job) {
private def handleJobCompletion(job: Job, completedTime: Long) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
listenerBus.post(StreamingListenerOutputOperationCompleted(
OutputOperationInfo(job.time, job.outputOpId, job.callSite, None, Some(completedTime))))
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
......@@ -210,7 +214,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// it's possible that when `post` is called, `eventLoop` happens to null.
var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job))
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
......@@ -219,7 +223,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job))
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
} else {
// JobScheduler has been stopped.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.scheduler
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.streaming.Time
/**
* :: DeveloperApi ::
* Class having information on output operations.
* @param batchTime Time of the batch
* @param id Id of this output operation. Different output operations have different ids in a batch.
* @param description The description of this output operation.
* @param startTime Clock time of when the output operation started processing
* @param endTime Clock time of when the output operation started processing
*/
@DeveloperApi
case class OutputOperationInfo(
batchTime: Time,
id: Int,
description: String,
startTime: Option[Long],
endTime: Option[Long]) {
/**
* Return the duration of this output operation.
*/
def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s
}
......@@ -38,6 +38,14 @@ case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends Streami
@DeveloperApi
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
@DeveloperApi
case class StreamingListenerOutputOperationStarted(outputOperationInfo: OutputOperationInfo)
extends StreamingListenerEvent
@DeveloperApi
case class StreamingListenerOutputOperationCompleted(outputOperationInfo: OutputOperationInfo)
extends StreamingListenerEvent
@DeveloperApi
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
......@@ -75,6 +83,14 @@ trait StreamingListener {
/** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
/** Called when processing of a job of a batch has started. */
def onOutputOperationStarted(
outputOperationStarted: StreamingListenerOutputOperationStarted) { }
/** Called when processing of a job of a batch has completed. */
def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}
......
......@@ -43,6 +43,10 @@ private[spark] class StreamingListenerBus
listener.onBatchStarted(batchStarted)
case batchCompleted: StreamingListenerBatchCompleted =>
listener.onBatchCompleted(batchCompleted)
case outputOperationStarted: StreamingListenerOutputOperationStarted =>
listener.onOutputOperationStarted(outputOperationStarted)
case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
listener.onOutputOperationCompleted(outputOperationCompleted)
case _ =>
}
}
......
......@@ -140,6 +140,27 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
}
test("output operation reporting") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD(_.count())
inputStream.foreachRDD(_.collect())
inputStream.foreachRDD(_.count())
val collector = new OutputOperationInfoCollector
ssc.addStreamingListener(collector)
ssc.start()
try {
eventually(timeout(30 seconds), interval(20 millis)) {
collector.startedOutputOperationIds.take(3) should be (Seq(0, 1, 2))
collector.completedOutputOperationIds.take(3) should be (Seq(0, 1, 2))
}
} finally {
ssc.stop()
}
}
test("onBatchCompleted with successful batch") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
......@@ -254,6 +275,22 @@ class ReceiverInfoCollector extends StreamingListener {
}
}
/** Listener that collects information on processed output operations */
class OutputOperationInfoCollector extends StreamingListener {
val startedOutputOperationIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
val completedOutputOperationIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
override def onOutputOperationStarted(
outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {
startedOutputOperationIds += outputOperationStarted.outputOperationInfo.id
}
override def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
completedOutputOperationIds += outputOperationCompleted.outputOperationInfo.id
}
}
class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_ONLY) with Logging {
def onStart() {
Future {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment