From 8e8de0073d71bb00baeb24c612d7841b6274f652 Mon Sep 17 00:00:00 2001
From: Sean Owen <sowen@cloudera.com>
Date: Tue, 4 Oct 2016 10:29:22 +0100
Subject: [PATCH] [SPARK-17671][WEBUI] Spark 2.0 history server summary page is
 slow even set spark.history.ui.maxApplications

## What changes were proposed in this pull request?

Return Iterator of applications internally in history server, for consistency and performance. See https://github.com/apache/spark/pull/15248 for some back-story.

The code called by and calling HistoryServer.getApplicationList wants an Iterator, but this method materializes an Iterable, which potentially causes a performance problem. It's simpler too to make this internal method also pass through an Iterator.

## How was this patch tested?

Existing tests.

Author: Sean Owen <sowen@cloudera.com>

Closes #15321 from srowen/SPARK-17671.
---
 .../history/ApplicationHistoryProvider.scala  |  2 +-
 .../deploy/history/FsHistoryProvider.scala    |  2 +-
 .../spark/deploy/history/HistoryPage.scala    |  5 +--
 .../spark/deploy/history/HistoryServer.scala  |  4 +-
 .../api/v1/ApplicationListResource.scala      | 38 +++++++------------
 .../deploy/history/HistoryServerSuite.scala   |  4 +-
 project/MimaExcludes.scala                    |  2 +
 7 files changed, 22 insertions(+), 35 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index ba42b4862a..ad7a0972ef 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -77,7 +77,7 @@ private[history] abstract class ApplicationHistoryProvider {
    *
    * @return List of all know applications.
    */
-  def getListing(): Iterable[ApplicationHistoryInfo]
+  def getListing(): Iterator[ApplicationHistoryInfo]
 
   /**
    * Returns the Spark UI for a specific application.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c5740e4737..3c2d169f32 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -222,7 +222,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     }
   }
 
-  override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values
+  override def getListing(): Iterator[FsApplicationHistoryInfo] = applications.values.iterator
 
   override def getApplicationInfo(appId: String): Option[FsApplicationHistoryInfo] = {
     applications.get(appId)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index b4f5a6114f..95b72224e0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -29,10 +29,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
     val requestedIncomplete =
       Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
 
-    val allApps = parent.getApplicationList()
-      .filter(_.completed != requestedIncomplete)
-    val allAppsSize = allApps.size
-
+    val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete)
     val providerConfig = parent.getProviderConfig()
     val content =
       <div>
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 735aa43cfc..087c69e648 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -174,12 +174,12 @@ class HistoryServer(
    *
    * @return List of all known applications.
    */
-  def getApplicationList(): Iterable[ApplicationHistoryInfo] = {
+  def getApplicationList(): Iterator[ApplicationHistoryInfo] = {
     provider.getListing()
   }
 
   def getApplicationInfoList: Iterator[ApplicationInfo] = {
-    getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
+    getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
   }
 
   def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
index 075b9ba37d..76779290d4 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.status.api.v1
 
-import java.util.{Arrays, Date, List => JList}
+import java.util.{Date, List => JList}
 import javax.ws.rs.{DefaultValue, GET, Produces, QueryParam}
 import javax.ws.rs.core.MediaType
 
@@ -32,33 +32,21 @@ private[v1] class ApplicationListResource(uiRoot: UIRoot) {
       @DefaultValue("3000-01-01") @QueryParam("maxDate") maxDate: SimpleDateParam,
       @QueryParam("limit") limit: Integer)
   : Iterator[ApplicationInfo] = {
-    val allApps = uiRoot.getApplicationInfoList
-    val adjStatus = {
-      if (status.isEmpty) {
-        Arrays.asList(ApplicationStatus.values(): _*)
-      } else {
-        status
-      }
-    }
-    val includeCompleted = adjStatus.contains(ApplicationStatus.COMPLETED)
-    val includeRunning = adjStatus.contains(ApplicationStatus.RUNNING)
-    val appList = allApps.filter { app =>
+
+    val numApps = Option(limit).map(_.toInt).getOrElse(Integer.MAX_VALUE)
+    val includeCompleted = status.isEmpty || status.contains(ApplicationStatus.COMPLETED)
+    val includeRunning = status.isEmpty || status.contains(ApplicationStatus.RUNNING)
+
+    uiRoot.getApplicationInfoList.filter { app =>
       val anyRunning = app.attempts.exists(!_.completed)
-      // if any attempt is still running, we consider the app to also still be running
-      val statusOk = (!anyRunning && includeCompleted) ||
-        (anyRunning && includeRunning)
+      // if any attempt is still running, we consider the app to also still be running;
       // keep the app if *any* attempts fall in the right time window
-      val dateOk = app.attempts.exists { attempt =>
-        attempt.startTime.getTime >= minDate.timestamp &&
-          attempt.startTime.getTime <= maxDate.timestamp
+      ((!anyRunning && includeCompleted) || (anyRunning && includeRunning)) &&
+      app.attempts.exists { attempt =>
+        val start = attempt.startTime.getTime
+        start >= minDate.timestamp && start <= maxDate.timestamp
       }
-      statusOk && dateOk
-    }
-    if (limit != null) {
-      appList.take(limit)
-    } else {
-      appList
-    }
+    }.take(numApps)
   }
 }
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index ae3f5d9c01..5b316b2f6b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -447,7 +447,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
       assert(4 === getNumJobsRestful(), s"two jobs back-to-back not updated, server=$server\n")
     }
     val jobcount = getNumJobs("/jobs")
-    assert(!provider.getListing().head.completed)
+    assert(!provider.getListing().next.completed)
 
     listApplications(false) should contain(appId)
 
@@ -455,7 +455,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     resetSparkContext()
     // check the app is now found as completed
     eventually(stdTimeout, stdInterval) {
-      assert(provider.getListing().head.completed,
+      assert(provider.getListing().next.completed,
         s"application never completed, server=$server\n")
     }
 
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 7362041428..163e3f2fde 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -37,6 +37,8 @@ object MimaExcludes {
   // Exclude rules for 2.1.x
   lazy val v21excludes = v20excludes ++ {
     Seq(
+      // [SPARK-17671] Spark 2.0 history server summary page is slow even set spark.history.ui.maxApplications
+      ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.deploy.history.HistoryServer.getApplicationList"),
       // [SPARK-14743] Improve delegation token handling in secure cluster
       ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getTimeFromNowToRenewal"),
       // [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter
-- 
GitLab