diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index fbe39b27649f636fb638b90f79d90c6501ae4521..553bf3cb945abca902a7806a4b4a153a69edd0cb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -25,7 +25,8 @@ private[spark] case class ApplicationHistoryInfo( startTime: Long, endTime: Long, lastUpdated: Long, - sparkUser: String) + sparkUser: String, + completed: Boolean = false) private[spark] abstract class ApplicationHistoryProvider { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 792d15b99ea0d307566d265f27819ebb377aad87..2b084a2d73b782cb465e83ef08a89ea3c48a8b71 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -173,20 +173,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val logInfos = statusList .filter { entry => try { - val isFinishedApplication = - if (isLegacyLogDirectory(entry)) { - fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE)) - } else { - !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS) - } - - if (isFinishedApplication) { - val modTime = getModificationTime(entry) - newLastModifiedTime = math.max(newLastModifiedTime, modTime) - modTime >= lastModifiedTime - } else { - false - } + val modTime = getModificationTime(entry) + newLastModifiedTime = math.max(newLastModifiedTime, modTime) + modTime >= lastModifiedTime } catch { case e: AccessControlException => // Do not use "logInfo" since these messages can get pretty noisy if printed on @@ -204,7 +193,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis None } } - .sortBy { info => -info.endTime } + .sortBy { info => (-info.endTime, -info.startTime) } lastModifiedTime = newLastModifiedTime @@ -261,7 +250,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis appListener.startTime.getOrElse(-1L), appListener.endTime.getOrElse(-1L), getModificationTime(eventLog), - appListener.sparkUser.getOrElse(NOT_STARTED)) + appListener.sparkUser.getOrElse(NOT_STARTED), + isApplicationCompleted(eventLog)) } finally { logInput.close() } @@ -329,6 +319,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis /** Returns the system's mononotically increasing time. */ private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000) + /** + * Return true when the application has completed. + */ + private def isApplicationCompleted(entry: FileStatus): Boolean = { + if (isLegacyLogDirectory(entry)) { + fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE)) + } else { + !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS) + } + } + } private object FsHistoryProvider { @@ -342,5 +343,6 @@ private class FsApplicationHistoryInfo( startTime: Long, endTime: Long, lastUpdated: Long, - sparkUser: String) - extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser) + sparkUser: String, + completed: Boolean = true) + extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 0d5dcfb1ddffe8bedbb271f83668bbcf99978bef..e4e7bc2216014a5713cfd952f971380aea2be51f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -31,8 +31,10 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt val requestedFirst = (requestedPage - 1) * pageSize + val requestedIncomplete = + Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean - val allApps = parent.getApplicationList() + val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete) val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0 val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size)) @@ -65,25 +67,26 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { <h4> Showing {actualFirst + 1}-{last + 1} of {allApps.size} - <span style="float: right"> - { - if (actualPage > 1) { - <a href={"/?page=" + (actualPage - 1)}>< </a> - <a href={"/?page=1"}>1</a> - } + {if (requestedIncomplete) "(Incomplete applications)"} + <span style="float: right"> + { + if (actualPage > 1) { + <a href={makePageLink(actualPage - 1, requestedIncomplete)}>< </a> + <a href={makePageLink(1, requestedIncomplete)}>1</a> } - {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "} - {leftSideIndices} - {actualPage} - {rightSideIndices} - {if (actualPage + plusOrMinus < secondPageFromRight) " ... "} - { - if (actualPage < pageCount) { - <a href={"/?page=" + pageCount}>{pageCount}</a> - <a href={"/?page=" + (actualPage + 1)}> ></a> - } + } + {if (actualPage - plusOrMinus > secondPageFromLeft) " ... "} + {leftSideIndices} + {actualPage} + {rightSideIndices} + {if (actualPage + plusOrMinus < secondPageFromRight) " ... "} + { + if (actualPage < pageCount) { + <a href={makePageLink(pageCount, requestedIncomplete)}>{pageCount}</a> + <a href={makePageLink(actualPage + 1, requestedIncomplete)}> ></a> } - </span> + } + </span> </h4> ++ appTable } else { @@ -96,6 +99,15 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { </p> } } + <a href={makePageLink(actualPage, !requestedIncomplete)}> + { + if (requestedIncomplete) { + "Back to completed applications" + } else { + "Show incomplete applications" + } + } + </a> </div> </div> UIUtils.basicSparkPage(content, "History Server") @@ -117,8 +129,9 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}" val startTime = UIUtils.formatDate(info.startTime) - val endTime = UIUtils.formatDate(info.endTime) - val duration = UIUtils.formatDuration(info.endTime - info.startTime) + val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else "-" + val duration = + if (info.endTime > 0) UIUtils.formatDuration(info.endTime - info.startTime) else "-" val lastUpdated = UIUtils.formatDate(info.lastUpdated) <tr> <td><a href={uiAddress}>{info.id}</a></td> @@ -130,4 +143,11 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { <td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td> </tr> } + + private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = { + "/?" + Array( + "page=" + linkPage, + "showIncomplete=" + showIncomplete + ).mkString("&") + } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index d719e9301f4fdebb953256d128c4f4e618c02a4d..8379883e065e72b820b2c7daab3236f66feff17f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -64,7 +64,8 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers ) // Write an unfinished app, new-style. - writeFile(new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS), true, None, + val logFile2 = new File(testDir, "new2" + EventLoggingListener.IN_PROGRESS) + writeFile(logFile2, true, None, SparkListenerApplicationStart("app2-2", None, 1L, "test") ) @@ -92,12 +93,17 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers val list = provider.getListing().toSeq list should not be (null) - list.size should be (2) + list.size should be (4) + list.count(e => e.completed) should be (2) list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L, - oldLog.lastModified(), "test")) + oldLog.lastModified(), "test", true)) list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1-1", 1L, 2L, - logFile1.lastModified(), "test")) + logFile1.lastModified(), "test", true)) + list(2) should be (ApplicationHistoryInfo(oldLog2.getName(), "app4", 2L, -1L, + oldLog2.lastModified(), "test", false)) + list(3) should be (ApplicationHistoryInfo(logFile2.getName(), "app2-2", 1L, -1L, + logFile2.lastModified(), "test", false)) // Make sure the UI can be rendered. list.foreach { case info =>