diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
index 9023e1ac012b7f4053218723359ca05d14d6743a..dbb0ad8d5c6738129f1d3dbdcbb425bc81004b28 100644
--- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -70,11 +70,28 @@ private[memory] class ExecutionMemoryPool(
    * active tasks) before it is forced to spill. This can happen if the number of tasks increase
    * but an older task had a lot of memory already.
    *
+   * @param numBytes number of bytes to acquire
+   * @param taskAttemptId the task attempt acquiring memory
+   * @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in
+   *                      one parameter (Long) that represents the desired amount of memory by
+   *                      which this pool should be expanded.
+   * @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool
+   *                           at this given moment. This is not a field because the max pool
+   *                           size is variable in certain cases. For instance, in unified
+   *                           memory management, the execution pool can be expanded by evicting
+   *                           cached blocks, thereby shrinking the storage pool.
+   *
    * @return the number of bytes granted to the task.
    */
-  def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = lock.synchronized {
+  private[memory] def acquireMemory(
+      numBytes: Long,
+      taskAttemptId: Long,
+      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
+      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
     assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
 
+    // TODO: clean up this clunky method signature
+
     // Add this task to the taskMemory map just so we can keep an accurate count of the number
     // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
     if (!memoryForTask.contains(taskAttemptId)) {
@@ -91,25 +108,31 @@ private[memory] class ExecutionMemoryPool(
       val numActiveTasks = memoryForTask.keys.size
       val curMem = memoryForTask(taskAttemptId)
 
-      // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
-      // don't let it be negative
-      val maxToGrant =
-        math.min(numBytes, math.max(0, (poolSize / numActiveTasks) - curMem))
+      // In every iteration of this loop, we should first try to reclaim any borrowed execution
+      // space from storage. This is necessary because of the potential race condition where new
+      // storage blocks may steal the free execution memory that this task was waiting for.
+      maybeGrowPool(numBytes - memoryFree)
+
+      // Maximum size the pool would have after potentially growing the pool.
+      // This is used to compute the upper bound of how much memory each task can occupy. This
+      // must take into account potential free memory as well as the amount this pool currently
+      // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
+      // we did not take into account space that could have been freed by evicting cached blocks.
+      val maxPoolSize = computeMaxPoolSize()
+      val maxMemoryPerTask = maxPoolSize / numActiveTasks
+      val minMemoryPerTask = poolSize / (2 * numActiveTasks)
+
+      // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
+      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
       // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
       val toGrant = math.min(maxToGrant, memoryFree)
 
-      if (curMem < poolSize / (2 * numActiveTasks)) {
-        // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
-        // if we can't give it this much now, wait for other tasks to free up memory
-        // (this happens if older tasks allocated lots of memory before N grew)
-        if (memoryFree >= math.min(maxToGrant, poolSize / (2 * numActiveTasks) - curMem)) {
-          memoryForTask(taskAttemptId) += toGrant
-          return toGrant
-        } else {
-          logInfo(
-            s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
-          lock.wait()
-        }
+      // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
+      // if we can't give it this much now, wait for other tasks to free up memory
+      // (this happens if older tasks allocated lots of memory before N grew)
+      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
+        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
+        lock.wait()
       } else {
         memoryForTask(taskAttemptId) += toGrant
         return toGrant
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 0b9f6a9dc0525bddbd0955b418d3934509542ed7..829f054dba0e936f7390a0350d7012e9c34b933e 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -81,22 +81,51 @@ private[spark] class UnifiedMemoryManager private[memory] (
     assert(numBytes >= 0)
     memoryMode match {
       case MemoryMode.ON_HEAP =>
-        if (numBytes > onHeapExecutionMemoryPool.memoryFree) {
-          val extraMemoryNeeded = numBytes - onHeapExecutionMemoryPool.memoryFree
-          // There is not enough free memory in the execution pool, so try to reclaim memory from
-          // storage. We can reclaim any free memory from the storage pool. If the storage pool
-          // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
-          // the memory that storage has borrowed from execution.
-          val memoryReclaimableFromStorage =
-            math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
-          if (memoryReclaimableFromStorage > 0) {
-            // Only reclaim as much space as is necessary and available:
-            val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
-              math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
-            onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+
+        /**
+         * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
+         *
+         * When acquiring memory for a task, the execution pool may need to make multiple
+         * attempts. Each attempt must be able to evict storage in case another task jumps in
+         * and caches a large block between the attempts. This is called once per attempt.
+         */
+        def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
+          if (extraMemoryNeeded > 0) {
+            // There is not enough free memory in the execution pool, so try to reclaim memory from
+            // storage. We can reclaim any free memory from the storage pool. If the storage pool
+            // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
+            // the memory that storage has borrowed from execution.
+            val memoryReclaimableFromStorage =
+              math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
+            if (memoryReclaimableFromStorage > 0) {
+              // Only reclaim as much space as is necessary and available:
+              val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
+                math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
+              onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+            }
           }
         }
-        onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+
+        /**
+         * The size the execution pool would have after evicting storage memory.
+         *
+         * The execution memory pool divides this quantity among the active tasks evenly to cap
+         * the execution memory allocation for each task. It is important to keep this greater
+         * than the execution pool size, which doesn't take into account potential memory that
+         * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
+         *
+         * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
+         * in execution memory allocation across tasks, Otherwise, a task may occupy more than
+         * its fair share of execution memory, mistakenly thinking that other tasks can acquire
+         * the portion of storage memory that cannot be evicted.
+         */
+        def computeMaxExecutionPoolSize(): Long = {
+          maxMemory - math.min(storageMemoryUsed, storageRegionSize)
+        }
+
+        onHeapExecutionMemoryPool.acquireMemory(
+          numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
+
       case MemoryMode.OFF_HEAP =>
         // For now, we only support on-heap caching of data, so we do not need to interact with
         // the storage pool when allocating off-heap memory. This will change in the future, though.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index d4bc3a5c900f7c428d5db9b0f94e38fa4a102b4d..9f27eed626be33b1b3c14bd020c934d7d43c5eda 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -92,6 +92,12 @@ private[spark] abstract class Task[T](
         Utils.tryLogNonFatalError {
           // Release memory used by this thread for unrolling blocks
           SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
+          // Notify any tasks waiting for execution memory to be freed to wake up and try to
+          // acquire memory again. This makes impossible the scenario where a task sleeps forever
+          // because there are no other tasks left to notify it. Since this is safe to do but may
+          // not be strictly necessary, we should revisit whether we can remove this in the future.
+          val memoryManager = SparkEnv.get.memoryManager
+          memoryManager.synchronized { memoryManager.notifyAll() }
         }
       } finally {
         TaskContext.unset()
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index e21a028b7faec34932a84bff6e9d0c3004a35eae..6cc48597d38f90cdef6bd717aec7df76740e2c6e 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -230,4 +230,29 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     assert(exception.getMessage.contains("larger heap size"))
   }
 
+  test("execution can evict cached blocks when there are multiple active tasks (SPARK-12155)") {
+    val conf = new SparkConf()
+      .set("spark.memory.fraction", "1")
+      .set("spark.memory.storageFraction", "0")
+      .set("spark.testing.memory", "1000")
+    val mm = UnifiedMemoryManager(conf, numCores = 2)
+    val ms = makeMemoryStore(mm)
+    assert(mm.maxMemory === 1000)
+    // Have two tasks each acquire some execution memory so that the memory pool registers that
+    // there are two active tasks:
+    assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
+    assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L)
+    // Fill up all of the remaining memory with storage.
+    assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
+    assert(mm.storageMemoryUsed === 800)
+    assert(mm.executionMemoryUsed === 200)
+    // A task should still be able to allocate 100 bytes execution memory by evicting blocks
+    assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
+    assertEvictBlocksToFreeSpaceCalled(ms, 100L)
+    assert(mm.executionMemoryUsed === 300)
+    assert(mm.storageMemoryUsed === 700)
+    assert(evictedBlocks.nonEmpty)
+  }
+
 }