diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 53c18ca3ff50c6802af45eb70efca52e3f59a3a8..e573ff16c50a36279e9e9cddc9f084edebe86e30 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -126,11 +126,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     // Disable the background thread during tests.
     if (!conf.contains("spark.testing")) {
       // A task that periodically checks for event log updates on disk.
-      pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
+      pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
 
       if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
         // A task that periodically cleans event logs on disk.
-        pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
+        pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
       }
     }
   }
@@ -204,11 +204,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
           mod1 >= mod2
       }
 
-      logInfos.sliding(20, 20).foreach { batch =>
-        replayExecutor.submit(new Runnable {
-          override def run(): Unit = mergeApplicationListing(batch)
-        })
-      }
+      logInfos.grouped(20)
+        .map { batch =>
+          replayExecutor.submit(new Runnable {
+            override def run(): Unit = mergeApplicationListing(batch)
+          })
+        }
+        .foreach { task =>
+          try {
+            // Wait for all tasks to finish. This makes sure that checkForLogs
+            // is not scheduled again while some tasks are already running in
+            // the replayExecutor.
+            task.get()
+          } catch {
+            case e: InterruptedException =>
+              throw e
+            case e: Exception =>
+              logError("Exception while merging application listings", e)
+          }
+        }
 
       lastModifiedTime = newLastModifiedTime
     } catch {