Skip to content
Snippets Groups Projects
Commit 00c9c7d9 authored by Vinayak's avatar Vinayak Committed by Tom Graves
Browse files

[SPARK-17843][WEB UI] Indicate event logs pending for processing on history server UI

## What changes were proposed in this pull request?

History Server UI's application listing to display information on currently under process event logs so a user knows that pending this processing an application may not list on the UI.

When there are no event logs under process, the application list page has a "Last Updated" date-time at the top indicating the date-time of the last _completed_ scan of the event logs. The value is displayed to the user in his/her local time zone.
## How was this patch tested?

All unit tests pass. Particularly all the suites under org.apache.spark.deploy.history.\* were run to test changes.
- Very first startup - Pending logs - no logs processed yet:

<img width="1280" alt="screen shot 2016-10-24 at 3 07 04 pm" src="https://cloud.githubusercontent.com/assets/12079825/19640981/b8d2a96a-99fc-11e6-9b1f-2d736fe90e48.png">
- Very first startup - Pending logs - some logs processed:

<img width="1280" alt="screen shot 2016-10-24 at 3 18 42 pm" src="https://cloud.githubusercontent.com/assets/12079825/19641087/3f8e3bae-99fd-11e6-9ef1-e0e70d71d8ef.png">
- Last updated - No currently pending logs:

<img width="1280" alt="screen shot 2016-10-17 at 8 34 37 pm" src="https://cloud.githubusercontent.com/assets/12079825/19443100/4d13946c-94a9-11e6-8ee2-c442729bb206.png">
- Last updated - With some currently pending logs:

<img width="1280" alt="screen shot 2016-10-24 at 3 09 31 pm" src="https://cloud.githubusercontent.com/assets/12079825/19640903/7323ba3a-99fc-11e6-8359-6a45753dbb28.png">
- No applications found and No currently pending logs:

<img width="1280" alt="screen shot 2016-10-24 at 3 24 26 pm" src="https://cloud.githubusercontent.com/assets/12079825/19641364/03a2cb04-99fe-11e6-87d6-d09587fc6201.png

">

Author: Vinayak <vijoshi5@in.ibm.com>

Closes #15410 from vijoshi/SAAS-608_master.

(cherry picked from commit a531fe1a)
Signed-off-by: default avatarTom Graves <tgraves@yahoo-inc.com>
parent 51dca614
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
$(document).ready(function() {
if ($('#last-updated').length) {
var lastUpdatedMillis = Number($('#last-updated').text());
var updatedDate = new Date(lastUpdatedMillis);
$('#last-updated').text(updatedDate.toLocaleDateString()+", "+updatedDate.toLocaleTimeString())
}
});
......@@ -74,6 +74,30 @@ private[history] case class LoadedAppUI(
private[history] abstract class ApplicationHistoryProvider {
/**
* Returns the count of application event logs that the provider is currently still processing.
* History Server UI can use this to indicate to a user that the application listing on the UI
* can be expected to list additional known applications once the processing of these
* application event logs completes.
*
* A History Provider that does not have a notion of count of event logs that may be pending
* for processing need not override this method.
*
* @return Count of application event logs that are currently under process
*/
def getEventLogsUnderProcess(): Int = {
return 0;
}
/**
* Returns the time the history provider last updated the application history information
*
* @return 0 if this is undefined or unsupported, otherwise the last updated time in millis
*/
def getLastUpdatedTime(): Long = {
return 0;
}
/**
* Returns a list of applications available for the history server to show.
*
......
......@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
import java.io.{FileNotFoundException, IOException, OutputStream}
import java.util.UUID
import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.mutable
......@@ -108,7 +108,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// The modification time of the newest log detected during the last scan. Currently only
// used for logging msgs (logs are re-scanned based on file size, rather than modtime)
private var lastScanTime = -1L
private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1)
// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
// into the map in order, so the LinkedHashMap maintains the correct ordering.
......@@ -120,6 +120,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)
/**
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
......@@ -226,6 +228,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
applications.get(appId)
}
override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get()
override def getLastUpdatedTime(): Long = lastScanTime.get()
override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
try {
applications.get(appId).flatMap { appInfo =>
......@@ -329,26 +335,43 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
if (logInfos.nonEmpty) {
logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
}
logInfos.map { file =>
replayExecutor.submit(new Runnable {
var tasks = mutable.ListBuffer[Future[_]]()
try {
for (file <- logInfos) {
tasks += replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(file)
})
}
.foreach { task =>
try {
// Wait for all tasks to finish. This makes sure that checkForLogs
// is not scheduled again while some tasks are already running in
// the replayExecutor.
task.get()
} catch {
case e: InterruptedException =>
throw e
case e: Exception =>
logError("Exception while merging application listings", e)
}
} catch {
// let the iteration over logInfos break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting event log for replay", e)
}
pendingReplayTasksCount.addAndGet(tasks.size)
tasks.foreach { task =>
try {
// Wait for all tasks to finish. This makes sure that checkForLogs
// is not scheduled again while some tasks are already running in
// the replayExecutor.
task.get()
} catch {
case e: InterruptedException =>
throw e
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
pendingReplayTasksCount.decrementAndGet()
}
}
lastScanTime = newLastScanTime
lastScanTime.set(newLastScanTime)
} catch {
case e: Exception => logError("Exception in checking for event log updates", e)
}
......@@ -365,7 +388,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} catch {
case e: Exception =>
logError("Exception encountered when attempting to update last scan time", e)
lastScanTime
lastScanTime.get()
} finally {
if (!fs.delete(path, true)) {
logWarning(s"Error deleting ${path}")
......
......@@ -30,13 +30,30 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
val allAppsSize = parent.getApplicationList().count(_.completed != requestedIncomplete)
val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess()
val lastUpdatedTime = parent.getLastUpdatedTime()
val providerConfig = parent.getProviderConfig()
val content =
<script src={UIUtils.prependBaseUri("/static/historypage-common.js")}></script>
<div>
<div class="span12">
<ul class="unstyled">
{providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
</ul>
{
if (eventLogsUnderProcessCount > 0) {
<p>There are {eventLogsUnderProcessCount} event log(s) currently being
processed which may result in additional applications getting listed on this page.
Refresh the page to view updates. </p>
}
}
{
if (lastUpdatedTime > 0) {
<p>Last updated: <span id="last-updated">{lastUpdatedTime}</span></p>
}
}
{
if (allAppsSize > 0) {
<script src={UIUtils.prependBaseUri("/static/dataTables.rowsGroup.js")}></script> ++
......@@ -46,6 +63,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
<script>setAppLimit({parent.maxApplications})</script>
} else if (requestedIncomplete) {
<h4>No incomplete applications found!</h4>
} else if (eventLogsUnderProcessCount > 0) {
<h4>No completed applications found!</h4>
} else {
<h4>No completed applications found!</h4> ++ parent.emptyListingHtml
}
......
......@@ -179,6 +179,14 @@ class HistoryServer(
provider.getListing()
}
def getEventLogsUnderProcess(): Int = {
provider.getEventLogsUnderProcess()
}
def getLastUpdatedTime(): Long = {
provider.getLastUpdatedTime()
}
def getApplicationInfoList: Iterator[ApplicationInfo] = {
getApplicationList().map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment