Skip to content
Snippets Groups Projects
Commit a81e336f authored by uncleGen's avatar uncleGen Committed by Shixiong Zhu
Browse files

[SPARK-19182][DSTREAM] Optimize the lock in StreamingJobProgressListener to...

[SPARK-19182][DSTREAM] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs

## What changes were proposed in this pull request?

When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata).
It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation.

## How was this patch tested?
existing ut

cc zsxwing

Author: uncleGen <hustyugm@gmail.com>

Closes #16601 from uncleGen/SPARK-19182.
parent 569e5068
No related branches found
No related tags found
No related merge requests found
......@@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
@volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil
var rememberDuration: Duration = null
var checkpointInProgress = false
var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null
@volatile private var numReceivers: Int = 0
def start(time: Time) {
this.synchronized {
......@@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validateAtStart)
outputStreams.foreach(_.validateAtStart())
numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
inputStreamNameAndID = inputStreams.map(is => (is.name, is.id))
inputStreams.par.foreach(_.start())
}
}
......@@ -106,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
.toArray
}
def getInputStreamName(streamId: Int): Option[String] = synchronized {
inputStreams.find(_.id == streamId).map(_.name)
}
def getNumReceivers: Int = numReceivers
def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
......
......@@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}
def numInactiveReceivers: Int = {
ssc.graph.getReceiverInputStreams().length - numActiveReceivers
ssc.graph.getNumReceivers - numActiveReceivers
}
def numTotalCompletedBatches: Long = synchronized {
......@@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}
def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
completedBatchUIData.toSeq
completedBatchUIData.toIndexedSeq
}
def streamName(streamId: Int): Option[String] = {
ssc.graph.getInputStreamName(streamId)
ssc.graph.getInputStreamNameAndID.find(_._2 == streamId).map(_._1)
}
/**
* Return all InputDStream Ids
*/
def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
def streamIds: Seq[Int] = ssc.graph.getInputStreamNameAndID.map(_._2)
/**
* Return all of the record rates for each InputDStream in each batch. The key of the return value
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment