Skip to content
Snippets Groups Projects
Commit ea88b1a5 authored by Andrew Or's avatar Andrew Or
Browse files

Revert "[SPARK-8372] History server shows incorrect information for application not started"

This reverts commit 2837e067.
parent 715f084c
No related branches found
No related tags found
No related merge requests found
...@@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ...@@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
replayBus.addListener(appListener) replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus) val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") } ui.setAppName(s"${appInfo.name} ($appId)")
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled) ui.getSecurityManager.setAcls(uiAclsEnabled)
...@@ -282,12 +282,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ...@@ -282,12 +282,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus => val newAttempts = logs.flatMap { fileStatus =>
try { try {
val res = replay(fileStatus, bus) val res = replay(fileStatus, bus)
res match { logInfo(s"Application log ${res.logPath} loaded successfully.")
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.") Some(res)
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
"The application may have not started.")
}
res
} catch { } catch {
case e: Exception => case e: Exception =>
logError( logError(
...@@ -433,11 +429,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ...@@ -433,11 +429,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/** /**
* Replays the events in the specified log file and returns information about the associated * Replays the events in the specified log file and returns information about the associated
* application. Return `None` if the application ID cannot be located. * application.
*/ */
private def replay( private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
eventLog: FileStatus,
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath() val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath") logInfo(s"Replaying log path: $logPath")
val logInput = val logInput =
...@@ -451,18 +445,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ...@@ -451,18 +445,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog) val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener) bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted) bus.replay(logInput, logPath.toString, !appCompleted)
appListener.appId.map { appId => new FsApplicationAttemptInfo(
new FsApplicationAttemptInfo( logPath.getName(),
logPath.getName(), appListener.appName.getOrElse(NOT_STARTED),
appListener.appName.getOrElse(NOT_STARTED), appListener.appId.getOrElse(logPath.getName()),
appId, appListener.appAttemptId,
appListener.appAttemptId, appListener.startTime.getOrElse(-1L),
appListener.startTime.getOrElse(-1L), appListener.endTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L), getModificationTime(eventLog).get,
getModificationTime(eventLog).get, appListener.sparkUser.getOrElse(NOT_STARTED),
appListener.sparkUser.getOrElse(NOT_STARTED), appCompleted)
appCompleted)
}
} finally { } finally {
logInput.close() logInput.close()
} }
......
...@@ -67,8 +67,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ...@@ -67,8 +67,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// Write a new-style application log. // Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false) val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None, writeFile(newAppComplete, true, None,
SparkListenerApplicationStart( SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
"new-app-complete", Some("new-app-complete"), 1L, "test", None),
SparkListenerApplicationEnd(5L) SparkListenerApplicationEnd(5L)
) )
...@@ -76,15 +75,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ...@@ -76,15 +75,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false, val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
Some("lzf")) Some("lzf"))
writeFile(newAppCompressedComplete, true, None, writeFile(newAppCompressedComplete, true, None,
SparkListenerApplicationStart( SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
"new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None),
SparkListenerApplicationEnd(4L)) SparkListenerApplicationEnd(4L))
// Write an unfinished app, new-style. // Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", None, inProgress = true) val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None, writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart( SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
"new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None)
) )
// Write an old-style application log. // Write an old-style application log.
...@@ -92,8 +89,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ...@@ -92,8 +89,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
oldAppComplete.mkdir() oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0")) createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None, writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart( SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
"old-app-complete", Some("old-app-complete"), 2L, "test", None),
SparkListenerApplicationEnd(3L) SparkListenerApplicationEnd(3L)
) )
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE)) createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
...@@ -107,8 +103,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ...@@ -107,8 +103,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
oldAppIncomplete.mkdir() oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0")) createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None, writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart( SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
"old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None)
) )
// Force a reload of data from the log directory, and check that both logs are loaded. // Force a reload of data from the log directory, and check that both logs are loaded.
...@@ -129,16 +124,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ...@@ -129,16 +124,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed))) List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
} }
list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L, list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
newAppComplete.lastModified(), "test", true)) newAppComplete.lastModified(), "test", true))
list(1) should be (makeAppInfo("new-app-compressed-complete", list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
true)) true))
list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L, list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
oldAppComplete.lastModified(), "test", true)) oldAppComplete.lastModified(), "test", true))
list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L, list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
oldAppIncomplete.lastModified(), "test", false)) oldAppIncomplete.lastModified(), "test", false))
list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L, list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
newAppIncomplete.lastModified(), "test", false)) newAppIncomplete.lastModified(), "test", false))
// Make sure the UI can be rendered. // Make sure the UI can be rendered.
...@@ -162,7 +157,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ...@@ -162,7 +157,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
logDir.mkdir() logDir.mkdir()
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0")) createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec), writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None), SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationEnd(3L) SparkListenerApplicationEnd(3L)
) )
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName)) createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
...@@ -185,12 +180,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ...@@ -185,12 +180,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
test("SPARK-3697: ignore directories that cannot be read.") { test("SPARK-3697: ignore directories that cannot be read.") {
val logFile1 = newLogFile("new1", None, inProgress = false) val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None, writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None), SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
SparkListenerApplicationEnd(2L) SparkListenerApplicationEnd(2L)
) )
val logFile2 = newLogFile("new2", None, inProgress = false) val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None, writeFile(logFile2, true, None,
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None), SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
SparkListenerApplicationEnd(2L) SparkListenerApplicationEnd(2L)
) )
logFile2.setReadable(false, false) logFile2.setReadable(false, false)
...@@ -223,18 +218,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ...@@ -223,18 +218,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
} }
} }
test("Parse logs that application is not started") {
val provider = new FsHistoryProvider((createTestConf()))
val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
SparkListenerLogStart("1.4")
)
updateAndCheck(provider) { list =>
list.size should be (0)
}
}
test("SPARK-5582: empty log directory") { test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf()) val provider = new FsHistoryProvider(createTestConf())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment