Skip to content
Snippets Groups Projects
Commit 8f5c827b authored by Kousuke Saruta's avatar Kousuke Saruta Committed by Andrew Or
Browse files

[SPARK-5344][WebUI] HistoryServer cannot recognize that inprogress file was...

[SPARK-5344][WebUI] HistoryServer cannot recognize that inprogress file was renamed to completed file

`FsHistoryProvider` tries to update application status but if `checkForLogs` is called before `.inprogress` file is renamed to completed file, the file is not recognized as completed.

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #4132 from sarutak/SPARK-5344 and squashes the following commits:

9658008 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5344
d2c72b6 [Kousuke Saruta] Fixed update issue of FsHistoryProvider
parent 9f643576
No related branches found
No related tags found
No related merge requests found
......@@ -203,7 +203,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
if (!logInfos.isEmpty) {
val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo) = {
if (!newApps.contains(info.id)) {
if (!newApps.contains(info.id) ||
newApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
!info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
newApps += (info.id -> info)
}
}
......
......@@ -167,6 +167,29 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
list.size should be (1)
}
test("history file is renamed from inprogress to completed") {
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
.set("spark.testing", "true")
val provider = new FsHistoryProvider(conf)
val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
SparkListenerApplicationEnd(2L)
)
provider.checkForLogs()
val appListBeforeRename = provider.getListing()
appListBeforeRename.size should be (1)
appListBeforeRename.head.logPath should endWith(EventLoggingListener.IN_PROGRESS)
logFile1.renameTo(new File(testDir, "app1"))
provider.checkForLogs()
val appListAfterRename = provider.getListing()
appListAfterRename.size should be (1)
appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS)
}
private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
val out =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment