diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 09701abdb0574d2de704bdf0c941290e9928e70a..3bd16bf60c81861052e7a2e2ca1778c8b448be36 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -371,6 +371,14 @@ If you need a reference to the proper location to put log files in the YARN so t
   This feature is not enabled if not configured, and only supported in Hadoop 2.6+.
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.executor.failuresValidityInterval</code></td>
+  <td>(none)</td>
+  <td>
+  Defines the validity interval for executor failure tracking.
+  Executor failures which are older than the validity interval will be ignored.
+  </td>
+</tr>
 <tr>
   <td><code>spark.yarn.submit.waitAppCompletion</code></td>
   <td><code>true</code></td>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8b07dc3af40209dfb4eb05588a3616c0d76a40ee..6184ad591c1d2abd78082aa82ddcbad903fb95da 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -231,14 +231,14 @@ private[spark] class Client(
           "Cluster's default value will be used.")
     }
 
-    sparkConf.get(ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
+    sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
       try {
         val method = appContext.getClass().getMethod(
           "setAttemptFailuresValidityInterval", classOf[Long])
         method.invoke(appContext, interval: java.lang.Long)
       } catch {
         case e: NoSuchMethodException =>
-          logWarning(s"Ignoring ${ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
+          logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
             "the version of YARN does not support it")
       }
     }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index b59e6cff2f75c48da304ce40253e053be63cb4a0..066c6659543a61f2f2811fffbcb818dc92d2b7f7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -22,7 +22,7 @@ import java.util.concurrent._
 import java.util.regex.Pattern
 
 import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
@@ -41,7 +41,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
 
 /**
  * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -102,7 +102,13 @@ private[yarn] class YarnAllocator(
   private var executorIdCounter: Int =
     driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
 
-  @volatile private var numExecutorsFailed = 0
+  // Queue to store the timestamp of failed executors
+  private val failedExecutorsTimeStamps = new Queue[Long]()
+
+  private var clock: Clock = new SystemClock
+
+  private val executorFailuresValidityInterval =
+    sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
 
   @volatile private var targetNumExecutors =
     YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
@@ -166,9 +172,26 @@ private[yarn] class YarnAllocator(
   private[yarn] val containerPlacementStrategy =
     new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
 
+  /**
+   * Use a different clock for YarnAllocator. This is mainly used for testing.
+   */
+  def setClock(newClock: Clock): Unit = {
+    clock = newClock
+  }
+
   def getNumExecutorsRunning: Int = numExecutorsRunning
 
-  def getNumExecutorsFailed: Int = numExecutorsFailed
+  def getNumExecutorsFailed: Int = synchronized {
+    val endTime = clock.getTimeMillis()
+
+    while (executorFailuresValidityInterval > 0
+      && failedExecutorsTimeStamps.nonEmpty
+      && failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) {
+      failedExecutorsTimeStamps.dequeue()
+    }
+
+    failedExecutorsTimeStamps.size
+  }
 
   /**
    * A sequence of pending container requests that have not yet been fulfilled.
@@ -527,7 +550,8 @@ private[yarn] class YarnAllocator(
               completedContainer.getDiagnostics,
               PMEM_EXCEEDED_PATTERN))
           case _ =>
-            numExecutorsFailed += 1
+            // Enqueue the timestamp of failed executor
+            failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
             (true, "Container marked as failed: " + containerId + onHostStr +
               ". Exit status: " + completedContainer.getExitStatus +
               ". Diagnostics: " + completedContainer.getDiagnostics)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 3816a84ab2e7c94c85ce1606b9afde653b8cfa36..c4dd3202f0a15cddbdd15fa32671b5bff81a03db 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -33,13 +33,20 @@ package object config {
     .toSequence
     .createOptional
 
-  private[spark] val ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
+  private[spark] val AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
     ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval")
       .doc("Interval after which AM failures will be considered independent and " +
         "not accumulate towards the attempt count.")
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
+    ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
+      .doc("Interval after which Executor failures will be considered independent and not " +
+        "accumulate towards the attempt count.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+
   private[spark] val MAX_APP_ATTEMPTS = ConfigBuilder("spark.yarn.maxAppAttempts")
     .doc("Maximum number of AM attempts before failing the app.")
     .intConf
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 6a861d6f66edf56e8dfe41d01ac48af9056551e6..f4f8bd435d5f21cdf6f04e934990f2d348896ea9 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.deploy.yarn.YarnAllocator._
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.ManualClock
 
 class MockResolver extends DNSToSwitchMapping {
 
@@ -275,4 +276,49 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
     assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
   }
+
+  test("window based failure executor counting") {
+    sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
+    val handler = createAllocator(4)
+    val clock = new ManualClock(0L)
+    handler.setClock(clock)
+
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getPendingAllocate.size should be (4)
+
+    val containers = Seq(
+      createContainer("host1"),
+      createContainer("host2"),
+      createContainer("host3"),
+      createContainer("host4")
+    )
+    handler.handleAllocatedContainers(containers)
+
+    val failedStatuses = containers.map { c =>
+      ContainerStatus.newInstance(c.getId, ContainerState.COMPLETE, "Failed", -1)
+    }
+
+    handler.getNumExecutorsFailed should be (0)
+
+    clock.advance(100 * 1000L)
+    handler.processCompletedContainers(failedStatuses.slice(0, 1))
+    handler.getNumExecutorsFailed should be (1)
+
+    clock.advance(101 * 1000L)
+    handler.getNumExecutorsFailed should be (0)
+
+    handler.processCompletedContainers(failedStatuses.slice(1, 3))
+    handler.getNumExecutorsFailed should be (2)
+
+    clock.advance(50 * 1000L)
+    handler.processCompletedContainers(failedStatuses.slice(3, 4))
+    handler.getNumExecutorsFailed should be (3)
+
+    clock.advance(51 * 1000L)
+    handler.getNumExecutorsFailed should be (1)
+
+    clock.advance(50 * 1000L)
+    handler.getNumExecutorsFailed should be (0)
+  }
 }