From a81e336f1eddc2c6245d807aae2c81ddc60eabf9 Mon Sep 17 00:00:00 2001
From: uncleGen <hustyugm@gmail.com>
Date: Wed, 18 Jan 2017 10:55:31 -0800
Subject: [PATCH] [SPARK-19182][DSTREAM] Optimize the lock in
 StreamingJobProgressListener to not block UI when generating Streaming jobs

## What changes were proposed in this pull request?

When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata).
It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation.

## How was this patch tested?
existing ut

cc zsxwing

Author: uncleGen <hustyugm@gmail.com>

Closes #16601 from uncleGen/SPARK-19182.
---
 .../org/apache/spark/streaming/DStreamGraph.scala   | 13 +++++++++----
 .../streaming/ui/StreamingJobProgressListener.scala |  8 ++++----
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 54d736ee51..dce2028b48 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
   private val inputStreams = new ArrayBuffer[InputDStream[_]]()
   private val outputStreams = new ArrayBuffer[DStream[_]]()
 
+  @volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil
+
   var rememberDuration: Duration = null
   var checkpointInProgress = false
 
   var zeroTime: Time = null
   var startTime: Time = null
   var batchDuration: Duration = null
+  @volatile private var numReceivers: Int = 0
 
   def start(time: Time) {
     this.synchronized {
@@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
       startTime = time
       outputStreams.foreach(_.initialize(zeroTime))
       outputStreams.foreach(_.remember(rememberDuration))
-      outputStreams.foreach(_.validateAtStart)
+      outputStreams.foreach(_.validateAtStart())
+      numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
+      inputStreamNameAndID = inputStreams.map(is => (is.name, is.id))
       inputStreams.par.foreach(_.start())
     }
   }
@@ -106,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
       .toArray
   }
 
-  def getInputStreamName(streamId: Int): Option[String] = synchronized {
-    inputStreams.find(_.id == streamId).map(_.name)
-  }
+  def getNumReceivers: Int = numReceivers
+
+  def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID
 
   def generateJobs(time: Time): Seq[Job] = {
     logDebug("Generating jobs for time " + time)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 95f582106c..ed4c1e484e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 
   def numInactiveReceivers: Int = {
-    ssc.graph.getReceiverInputStreams().length - numActiveReceivers
+    ssc.graph.getNumReceivers - numActiveReceivers
   }
 
   def numTotalCompletedBatches: Long = synchronized {
@@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
   }
 
   def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
-    completedBatchUIData.toSeq
+    completedBatchUIData.toIndexedSeq
   }
 
   def streamName(streamId: Int): Option[String] = {
-    ssc.graph.getInputStreamName(streamId)
+    ssc.graph.getInputStreamNameAndID.find(_._2 == streamId).map(_._1)
   }
 
   /**
    * Return all InputDStream Ids
    */
-  def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
+  def streamIds: Seq[Int] = ssc.graph.getInputStreamNameAndID.map(_._2)
 
   /**
    * Return all of the record rates for each InputDStream in each batch. The key of the return value
-- 
GitLab