diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index bae7a3f307f52944532766b420aaf48a39b9691e..9cc321af4bde249d77efad3c605c2e89ad8d745e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -28,6 +28,7 @@ import scala.util.control.NonFatal
 import com.google.common.primitives.Longs
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.fs.permission.FsAction
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -353,6 +354,28 @@ class SparkHadoopUtil extends Logging {
     }
     buffer.toString
   }
+
+  private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
+    val perm = status.getPermission
+    val ugi = UserGroupInformation.getCurrentUser
+
+    if (ugi.getShortUserName == status.getOwner) {
+      if (perm.getUserAction.implies(mode)) {
+        return true
+      }
+    } else if (ugi.getGroupNames.contains(status.getGroup)) {
+      if (perm.getGroupAction.implies(mode)) {
+        return true
+      }
+    } else if (perm.getOtherAction.implies(mode)) {
+      return true
+    }
+
+    logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
+      s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
+      s"${if (status.isDirectory) "d" else "-"}$perm")
+    false
+  }
 }
 
 object SparkHadoopUtil {
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 9012736bc2745925757305a479629bf062293b2d..f4235df2451282b8324ae5e439384ce93dbcb11a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -27,7 +27,8 @@ import scala.xml.Node
 
 import com.google.common.io.ByteStreams
 import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.permission.FsAction
 import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.apache.hadoop.hdfs.protocol.HdfsConstants
 import org.apache.hadoop.security.AccessControlException
@@ -318,21 +319,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       // scan for modified applications, replay and merge them
       val logInfos: Seq[FileStatus] = statusList
         .filter { entry =>
-          try {
-            val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
-            !entry.isDirectory() &&
-              // FsHistoryProvider generates a hidden file which can't be read.  Accidentally
-              // reading a garbage file is safe, but we would log an error which can be scary to
-              // the end-user.
-              !entry.getPath().getName().startsWith(".") &&
-              prevFileSize < entry.getLen()
-          } catch {
-            case e: AccessControlException =>
-              // Do not use "logInfo" since these messages can get pretty noisy if printed on
-              // every poll.
-              logDebug(s"No permission to read $entry, ignoring.")
-              false
-          }
+          val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
+          !entry.isDirectory() &&
+            // FsHistoryProvider generates a hidden file which can't be read.  Accidentally
+            // reading a garbage file is safe, but we would log an error which can be scary to
+            // the end-user.
+            !entry.getPath().getName().startsWith(".") &&
+            prevFileSize < entry.getLen() &&
+            SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
         }
         .flatMap { entry => Some(entry) }
         .sortWith { case (entry1, entry2) =>
@@ -445,7 +439,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   /**
    * Replay the log files in the list and merge the list of old applications with new ones
    */
-  private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
+  protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
     val newAttempts = try {
       val eventsFilter: ReplayEventsFilter = { eventString =>
         eventString.startsWith(APPL_START_EVENT_PREFIX) ||
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..ab24a76e20a3023a07fc8d3956cce1b8d3a3e70a
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.security.PrivilegedExceptionAction
+
+import scala.util.Random
+
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+import org.apache.hadoop.security.UserGroupInformation
+import org.scalatest.Matchers
+
+import org.apache.spark.SparkFunSuite
+
+class SparkHadoopUtilSuite extends SparkFunSuite with Matchers {
+  test("check file permission") {
+    import FsAction._
+    val testUser = s"user-${Random.nextInt(100)}"
+    val testGroups = Array(s"group-${Random.nextInt(100)}")
+    val testUgi = UserGroupInformation.createUserForTesting(testUser, testGroups)
+
+    testUgi.doAs(new PrivilegedExceptionAction[Void] {
+      override def run(): Void = {
+        val sparkHadoopUtil = new SparkHadoopUtil
+
+        // If file is owned by user and user has access permission
+        var status = fileStatus(testUser, testGroups.head, READ_WRITE, READ_WRITE, NONE)
+        sparkHadoopUtil.checkAccessPermission(status, READ) should be(true)
+        sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true)
+
+        // If file is owned by user but user has no access permission
+        status = fileStatus(testUser, testGroups.head, NONE, READ_WRITE, NONE)
+        sparkHadoopUtil.checkAccessPermission(status, READ) should be(false)
+        sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false)
+
+        val otherUser = s"test-${Random.nextInt(100)}"
+        val otherGroup = s"test-${Random.nextInt(100)}"
+
+        // If file is owned by user's group and user's group has access permission
+        status = fileStatus(otherUser, testGroups.head, NONE, READ_WRITE, NONE)
+        sparkHadoopUtil.checkAccessPermission(status, READ) should be(true)
+        sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true)
+
+        // If file is owned by user's group but user's group has no access permission
+        status = fileStatus(otherUser, testGroups.head, READ_WRITE, NONE, NONE)
+        sparkHadoopUtil.checkAccessPermission(status, READ) should be(false)
+        sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false)
+
+        // If file is owned by other user and this user has access permission
+        status = fileStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, READ_WRITE)
+        sparkHadoopUtil.checkAccessPermission(status, READ) should be(true)
+        sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(true)
+
+        // If file is owned by other user but this user has no access permission
+        status = fileStatus(otherUser, otherGroup, READ_WRITE, READ_WRITE, NONE)
+        sparkHadoopUtil.checkAccessPermission(status, READ) should be(false)
+        sparkHadoopUtil.checkAccessPermission(status, WRITE) should be(false)
+
+        null
+      }
+    })
+  }
+
+  private def fileStatus(
+      owner: String,
+      group: String,
+      userAction: FsAction,
+      groupAction: FsAction,
+      otherAction: FsAction): FileStatus = {
+    new FileStatus(0L,
+      false,
+      0,
+      0L,
+      0L,
+      0L,
+      new FsPermission(userAction, groupAction, otherAction),
+      owner,
+      group,
+      null)
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index ec580a44b8e76ddbe16b7ead3719425d10490a14..456158d41b93f5d8a67ac5fb11a5170e8332ecb5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.fs.FileStatus
 import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.json4s.jackson.JsonMethods._
 import org.mockito.Matchers.any
@@ -130,9 +131,19 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     }
   }
 
-  test("SPARK-3697: ignore directories that cannot be read.") {
+  test("SPARK-3697: ignore files that cannot be read.") {
     // setReadable(...) does not work on Windows. Please refer JDK-6728842.
     assume(!Utils.isWindows)
+
+    class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) {
+      var mergeApplicationListingCall = 0
+      override protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
+        super.mergeApplicationListing(fileStatus)
+        mergeApplicationListingCall += 1
+      }
+    }
+    val provider = new TestFsHistoryProvider
+
     val logFile1 = newLogFile("new1", None, inProgress = false)
     writeFile(logFile1, true, None,
       SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
@@ -145,10 +156,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
       )
     logFile2.setReadable(false, false)
 
-    val provider = new FsHistoryProvider(createTestConf())
     updateAndCheck(provider) { list =>
       list.size should be (1)
     }
+
+    provider.mergeApplicationListingCall should be (1)
   }
 
   test("history file is renamed from inprogress to completed") {