Skip to content
Snippets Groups Projects
Commit c3b881a7 authored by Chuan Shao's avatar Chuan Shao Committed by Marcelo Vanzin
Browse files

[SPARK-7336] [HISTORYSERVER] Fix bug that applications status incorrect on JobHistory UI.

Author: ArcherShao <shaochuan@huawei.com>

Closes #5886 from ArcherShao/SPARK-7336.
parent 00d9af5e
No related branches found
No related tags found
No related merge requests found
......@@ -18,6 +18,7 @@
package org.apache.spark.deploy.history
import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
import java.util.UUID
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}
......@@ -73,7 +74,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// The modification time of the newest log detected during the last scan. This is used
// to ignore logs that are older during subsequent scans, to avoid processing data that
// is already known.
private var lastModifiedTime = -1L
private var lastScanTime = -1L
// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
// into the map in order, so the LinkedHashMap maintains the correct ordering.
......@@ -179,15 +180,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
*/
private[history] def checkForLogs(): Unit = {
try {
val newLastScanTime = getNewLastScanTime()
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
var newLastModifiedTime = lastModifiedTime
val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
try {
getModificationTime(entry).map { time =>
newLastModifiedTime = math.max(newLastModifiedTime, time)
time >= lastModifiedTime
time >= lastScanTime
}.getOrElse(false)
} catch {
case e: AccessControlException =>
......@@ -224,12 +224,29 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
lastModifiedTime = newLastModifiedTime
lastScanTime = newLastScanTime
} catch {
case e: Exception => logError("Exception in checking for event log updates", e)
}
}
private def getNewLastScanTime(): Long = {
val fileName = "." + UUID.randomUUID().toString
val path = new Path(logDir, fileName)
val fos = fs.create(path)
try {
fos.close()
fs.getFileStatus(path).getModificationTime
} catch {
case e: Exception =>
logError("Exception encountered when attempting to update last scan time", e)
lastScanTime
} finally {
fs.delete(path)
}
}
override def writeEventLogs(
appId: String,
attemptId: Option[String],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment