diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 0ae45f4ad91302e090c17dd37259568fca3d1d60..92125f2df7d10d08f656dc85054d647e4c3fa9cc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -173,9 +173,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
       val logInfos = statusList
         .filter { entry =>
           try {
-            val modTime = getModificationTime(entry)
-            newLastModifiedTime = math.max(newLastModifiedTime, modTime)
-            modTime >= lastModifiedTime
+            getModificationTime(entry).map { time =>
+              newLastModifiedTime = math.max(newLastModifiedTime, time)
+              time >= lastModifiedTime
+            }.getOrElse(false)
           } catch {
             case e: AccessControlException =>
               // Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -251,7 +252,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         appListener.appName.getOrElse(NOT_STARTED),
         appListener.startTime.getOrElse(-1L),
         appListener.endTime.getOrElse(-1L),
-        getModificationTime(eventLog),
+        getModificationTime(eventLog).get,
         appListener.sparkUser.getOrElse(NOT_STARTED),
         isApplicationCompleted(eventLog))
     } finally {
@@ -310,11 +311,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
    */
   private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
 
-  private def getModificationTime(fsEntry: FileStatus): Long = {
-    if (fsEntry.isDir) {
-      fs.listStatus(fsEntry.getPath).map(_.getModificationTime()).max
+  /**
+   * Returns the modification time of the given event log. If the status points at an empty
+   * directory, `None` is returned, indicating that there isn't an event log at that location.
+   */
+  private def getModificationTime(fsEntry: FileStatus): Option[Long] = {
+    if (isLegacyLogDirectory(fsEntry)) {
+      val statusList = fs.listStatus(fsEntry.getPath)
+      if (!statusList.isEmpty) Some(statusList.map(_.getModificationTime()).max) else None
     } else {
-      fsEntry.getModificationTime()
+      Some(fsEntry.getModificationTime())
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 3fbc1a21d10ed56a1d8ada0cf5424acd689fa270..1d9543225811133be65a9b402603962a41265808 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -190,6 +190,24 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
     appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS)
   }
 
+  test("SPARK-5582: empty log directory") {
+    val conf = new SparkConf()
+      .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+    val provider = new FsHistoryProvider(conf)
+
+    val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+    writeFile(logFile1, true, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+      SparkListenerApplicationEnd(2L))
+
+    val oldLog = new File(testDir, "old1")
+    oldLog.mkdir()
+
+    provider.checkForLogs()
+    val appListAfterRename = provider.getListing()
+    appListAfterRename.size should be (1)
+  }
+
   private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
     events: SparkListenerEvent*) = {
     val out =