Skip to content
Snippets Groups Projects
Commit dc3ee6b6 authored by Tathagata Das's avatar Tathagata Das
Browse files

Added comments to BatchInfo and JobSet, based on Patrick's comment on PR 277.

parent 3ddbdbfb
No related branches found
No related tags found
No related merge requests found
......@@ -21,6 +21,12 @@ import org.apache.spark.streaming.Time
/**
* Class having information on completed batches.
* @param batchTime Time of the batch
* @param submissionTime Clock time of when jobs of this batch was submitted to
* the streaming scheduler queue
* @param processingStartTime Clock time of when the first job of this batch started processing
* @param processingEndTime Clock time of when the last job of this batch finished processing
*
*/
case class BatchInfo(
batchTime: Time,
......@@ -29,9 +35,22 @@ case class BatchInfo(
processingEndTime: Option[Long]
) {
/**
* Time taken for the first job of this batch to start processing from the time this batch
* was submitted to the streaming scheduler. Essentially, it is
* `processingStartTime` - `submissionTime`.
*/
def schedulingDelay = processingStartTime.map(_ - submissionTime)
/**
* Time taken for the all jobs of this batch to finish processing from the time they started
* processing. Essentially, it is `processingEndTime` - `processingStartTime`.
*/
def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
/**
* Time taken for all the jobs of this batch to finish processing from the time they
* were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
*/
def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
}
......@@ -27,9 +27,9 @@ private[streaming]
case class JobSet(time: Time, jobs: Seq[Job]) {
private val incompleteJobs = new HashSet[Job]()
var submissionTime = System.currentTimeMillis()
var processingStartTime = -1L
var processingEndTime = -1L
var submissionTime = System.currentTimeMillis() // when this jobset was submitted
var processingStartTime = -1L // when the first job of this jobset started processing
var processingEndTime = -1L // when the last job of this jobset finished processing
jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
incompleteJobs ++= jobs
......@@ -47,8 +47,12 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
def hasCompleted() = incompleteJobs.isEmpty
// Time taken to process all the jobs from the time they started processing
// (i.e. not including the time they wait in the streaming scheduler queue)
def processingDelay = processingEndTime - processingStartTime
// Time taken to process all the jobs from the time they were submitted
// (i.e. including the time they wait in the streaming scheduler queue)
def totalDelay = {
processingEndTime - time.milliseconds
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment