Skip to content
Snippets Groups Projects
Commit ed092a06 authored by Rohit Agarwal's avatar Rohit Agarwal Committed by Marcelo Vanzin
Browse files

[SPARK-9924] [WEB UI] Don't schedule checkForLogs while some of them are already running.

Author: Rohit Agarwal <rohita@qubole.com>

Closes #8153 from mindprince/SPARK-9924.
parent 76c155dd
No related branches found
No related tags found
No related merge requests found
...@@ -126,11 +126,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ...@@ -126,11 +126,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Disable the background thread during tests. // Disable the background thread during tests.
if (!conf.contains("spark.testing")) { if (!conf.contains("spark.testing")) {
// A task that periodically checks for event log updates on disk. // A task that periodically checks for event log updates on disk.
pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk. // A task that periodically cleans event logs on disk.
pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
} }
} }
} }
...@@ -204,11 +204,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ...@@ -204,11 +204,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
mod1 >= mod2 mod1 >= mod2
} }
logInfos.sliding(20, 20).foreach { batch => logInfos.grouped(20)
replayExecutor.submit(new Runnable { .map { batch =>
override def run(): Unit = mergeApplicationListing(batch) replayExecutor.submit(new Runnable {
}) override def run(): Unit = mergeApplicationListing(batch)
} })
}
.foreach { task =>
try {
// Wait for all tasks to finish. This makes sure that checkForLogs
// is not scheduled again while some tasks are already running in
// the replayExecutor.
task.get()
} catch {
case e: InterruptedException =>
throw e
case e: Exception =>
logError("Exception while merging application listings", e)
}
}
lastModifiedTime = newLastModifiedTime lastModifiedTime = newLastModifiedTime
} catch { } catch {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment