diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 9939103bb0903978669fe41acf9984b9033ce81d..49329423dca7615cdf044e3d2bd39d92f61d0590 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -101,6 +101,9 @@ private[spark] class ExecutorAllocationManager(
   private val executorIdleTimeoutS = conf.getTimeAsSeconds(
     "spark.dynamicAllocation.executorIdleTimeout", "60s")
 
+  private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
+    "spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${2 * executorIdleTimeoutS}s")
+
   // During testing, the methods to actually kill and add executors are mocked out
   private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
 
@@ -459,9 +462,23 @@ private[spark] class ExecutorAllocationManager(
   private def onExecutorIdle(executorId: String): Unit = synchronized {
     if (executorIds.contains(executorId)) {
       if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
+        // Note that it is not necessary to query the executors since all the cached
+        // blocks we are concerned with are reported to the driver. Note that this
+        // does not include broadcast blocks.
+        val hasCachedBlocks = SparkEnv.get.blockManager.master.hasCachedBlocks(executorId)
+        val now = clock.getTimeMillis()
+        val timeout = {
+          if (hasCachedBlocks) {
+            // Use a different timeout if the executor has cached blocks.
+            now + cachedExecutorIdleTimeoutS * 1000
+          } else {
+            now + executorIdleTimeoutS * 1000
+          }
+        }
+        val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // overflow
+        removeTimes(executorId) = realTimeout
         logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
-          s"scheduled to run on the executor (to expire in $executorIdleTimeoutS seconds)")
-        removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeoutS * 1000
+          s"scheduled to run on the executor (to expire in ${(realTimeout - now)/1000} seconds)")
       }
     } else {
       logWarning(s"Attempted to mark unknown executor $executorId idle")
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index abcad9438bf28dc790c6e7e962a1df7e3cd2ec09..7cdae22b0e253505619a2c0b38534e800ff7e920 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -202,6 +202,14 @@ class BlockManagerMaster(
     Await.result(future, timeout)
   }
 
+  /**
+   * Find out if the executor has cached blocks. This method does not consider broadcast blocks,
+   * since they are not reported the master.
+   */
+  def hasCachedBlocks(executorId: String): Boolean = {
+    driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId))
+  }
+
   /** Stop the driver endpoint, called only on the Spark driver node */
   def stop() {
     if (driverEndpoint != null && isDriver) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 2cd8c5297b741ca90aaffa67585f6fa8c03482a8..68ed9096731c513c57336f7e199eaf481e62b061 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import java.util.{HashMap => JHashMap}
 
+import scala.collection.immutable.HashSet
 import scala.collection.mutable
 import scala.collection.JavaConversions._
 import scala.concurrent.{ExecutionContext, Future}
@@ -112,6 +113,17 @@ class BlockManagerMasterEndpoint(
     case BlockManagerHeartbeat(blockManagerId) =>
       context.reply(heartbeatReceived(blockManagerId))
 
+    case HasCachedBlocks(executorId) =>
+      blockManagerIdByExecutor.get(executorId) match {
+        case Some(bm) =>
+          if (blockManagerInfo.contains(bm)) {
+            val bmInfo = blockManagerInfo(bm)
+            context.reply(bmInfo.cachedBlocks.nonEmpty)
+          } else {
+            context.reply(false)
+          }
+        case None => context.reply(false)
+      }
   }
 
   private def removeRdd(rddId: Int): Future[Seq[Int]] = {
@@ -418,6 +430,9 @@ private[spark] class BlockManagerInfo(
   // Mapping from block id to its status.
   private val _blocks = new JHashMap[BlockId, BlockStatus]
 
+  // Cached blocks held by this BlockManager. This does not include broadcast blocks.
+  private val _cachedBlocks = new mutable.HashSet[BlockId]
+
   def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.get(blockId))
 
   def updateLastSeenMs() {
@@ -451,27 +466,35 @@ private[spark] class BlockManagerInfo(
        * and the diskSize here indicates the data size in or dropped to disk.
        * They can be both larger than 0, when a block is dropped from memory to disk.
        * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
+      var blockStatus: BlockStatus = null
       if (storageLevel.useMemory) {
-        _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0, 0))
+        blockStatus = BlockStatus(storageLevel, memSize, 0, 0)
+        _blocks.put(blockId, blockStatus)
         _remainingMem -= memSize
         logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
           blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
           Utils.bytesToString(_remainingMem)))
       }
       if (storageLevel.useDisk) {
-        _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize, 0))
+        blockStatus = BlockStatus(storageLevel, 0, diskSize, 0)
+        _blocks.put(blockId, blockStatus)
         logInfo("Added %s on disk on %s (size: %s)".format(
           blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
       }
       if (storageLevel.useOffHeap) {
-        _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, externalBlockStoreSize))
+        blockStatus = BlockStatus(storageLevel, 0, 0, externalBlockStoreSize)
+        _blocks.put(blockId, blockStatus)
         logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
           blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
       }
+      if (!blockId.isBroadcast && blockStatus.isCached) {
+        _cachedBlocks += blockId
+      }
     } else if (_blocks.containsKey(blockId)) {
       // If isValid is not true, drop the block.
       val blockStatus: BlockStatus = _blocks.get(blockId)
       _blocks.remove(blockId)
+      _cachedBlocks -= blockId
       if (blockStatus.storageLevel.useMemory) {
         logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
           blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
@@ -494,6 +517,7 @@ private[spark] class BlockManagerInfo(
       _remainingMem += _blocks.get(blockId).memSize
       _blocks.remove(blockId)
     }
+    _cachedBlocks -= blockId
   }
 
   def remainingMem: Long = _remainingMem
@@ -502,6 +526,9 @@ private[spark] class BlockManagerInfo(
 
   def blocks: JHashMap[BlockId, BlockStatus] = _blocks
 
+  // This does not include broadcast blocks.
+  def cachedBlocks: collection.Set[BlockId] = _cachedBlocks
+
   override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
 
   def clear() {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 1683576067fe8c5587a5c9e0c4c3d79ff885fe77..376e9eb48843d0ced9357cbef6ecad4a4d77e91d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -42,7 +42,6 @@ private[spark] object BlockManagerMessages {
   case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
     extends ToBlockManagerSlave
 
-
   //////////////////////////////////////////////////////////////////////////////////
   // Messages from slaves to the master.
   //////////////////////////////////////////////////////////////////////////////////
@@ -108,4 +107,6 @@ private[spark] object BlockManagerMessages {
     extends ToBlockManagerMaster
 
   case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+  case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index 3a48da4592dd92c133886f2a7a18fe9a5b5b15e3..9667cebe0b87cbec78a4c7fc1e07ab49a7861e29 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1201,6 +1201,15 @@ Apart from these, the following properties are also available, and may be useful
     <a href="job-scheduling.html#resource-allocation-policy">description</a>.
   </td>
 </tr>
+<tr>
+  <td><code>spark.dynamicAllocation.cachedExecutorIdleTimeout</code></td>
+  <td>2 * executorIdleTimeout</td>
+  <td>
+    If dynamic allocation is enabled and an executor which has cached data blocks has been idle for more than this duration,
+    the executor will be removed. For more details, see this
+    <a href="job-scheduling.html#resource-allocation-policy">description</a>.
+  </td>
+</tr>
 <tr>
   <td><code>spark.dynamicAllocation.initialExecutors</code></td>
   <td><code>spark.dynamicAllocation.minExecutors</code></td>