diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 97c22fe724abda0877aa086e38c57eefe1a4a467..55024ecd55e61a78d8389f744a90fd44868c733d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -506,13 +506,64 @@ private[spark] class TaskSetManager(
    * Get the level we can launch tasks according to delay scheduling, based on current wait time.
    */
   private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
-    while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
-        currentLocalityIndex < myLocalityLevels.length - 1)
-    {
-      // Jump to the next locality level, and remove our waiting time for the current one since
-      // we don't want to count it again on the next one
-      lastLaunchTime += localityWaits(currentLocalityIndex)
-      currentLocalityIndex += 1
+    // Remove the scheduled or finished tasks lazily
+    def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
+      var indexOffset = pendingTaskIds.size
+      while (indexOffset > 0) {
+        indexOffset -= 1
+        val index = pendingTaskIds(indexOffset)
+        if (copiesRunning(index) == 0 && !successful(index)) {
+          return true
+        } else {
+          pendingTaskIds.remove(indexOffset)
+        }
+      }
+      false
+    }
+    // Walk through the list of tasks that can be scheduled at each location and returns true
+    // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
+    // already been scheduled.
+    def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
+      val emptyKeys = new ArrayBuffer[String]
+      val hasTasks = pendingTasks.exists {
+        case (id: String, tasks: ArrayBuffer[Int]) =>
+          if (tasksNeedToBeScheduledFrom(tasks)) {
+            true
+          } else {
+            emptyKeys += id
+            false
+          }
+      }
+      // The key could be executorId, host or rackId
+      emptyKeys.foreach(id => pendingTasks.remove(id))
+      hasTasks
+    }
+
+    while (currentLocalityIndex < myLocalityLevels.length - 1) {
+      val moreTasks = myLocalityLevels(currentLocalityIndex) match {
+        case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
+        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
+        case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
+        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
+      }
+      if (!moreTasks) {
+        // This is a performance optimization: if there are no more tasks that can
+        // be scheduled at a particular locality level, there is no point in waiting
+        // for the locality wait timeout (SPARK-4939).
+        lastLaunchTime = curTime
+        logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
+          s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
+        currentLocalityIndex += 1
+      } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
+        // Jump to the next locality level, and reset lastLaunchTime so that the next locality
+        // wait timer doesn't immediately expire
+        lastLaunchTime += localityWaits(currentLocalityIndex)
+        currentLocalityIndex += 1
+        logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
+          s"${localityWaits(currentLocalityIndex)}ms")
+      } else {
+        return myLocalityLevels(currentLocalityIndex)
+      }
     }
     myLocalityLevels(currentLocalityIndex)
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 84b9b788237bfa83161485ee86c56d214d713f1a..59580561cb45a78e4248f770767c374a61df4969 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -314,7 +314,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("delay scheduling with failed hosts") {
     sc = new SparkContext("local", "test")
-    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
+      ("exec3", "host3"))
     val taskSet = FakeTask.createTaskSet(3,
       Seq(TaskLocation("host1")),
       Seq(TaskLocation("host2")),
@@ -649,6 +650,47 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2)
   }
 
+  test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
+    val taskSet = FakeTask.createTaskSet(4,
+      Seq(TaskLocation("host1")),
+      Seq(TaskLocation("host2")),
+      Seq(ExecutorCacheTaskLocation("host1", "execA")),
+      Seq(ExecutorCacheTaskLocation("host2", "execB")))
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+    // process-local tasks are scheduled first
+    assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2)
+    assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 3)
+    // node-local tasks are scheduled without delay
+    assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0)
+    assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 1)
+    assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
+    assert(manager.resourceOffer("execB", "host2", NODE_LOCAL) == None)
+  }
+
+  test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
+    val taskSet = FakeTask.createTaskSet(3,
+      Seq(),
+      Seq(ExecutorCacheTaskLocation("host1", "execA")),
+      Seq(ExecutorCacheTaskLocation("host2", "execB")))
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+    // process-local tasks are scheduled first
+    assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1)
+    assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 2)
+    // no-pref tasks are scheduled without delay
+    assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) == None)
+    assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
+    assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 0)
+    assert(manager.resourceOffer("execA", "host1", ANY) == None)
+  }
+
   test("Ensure TaskSetManager is usable after addition of levels") {
     // Regression test for SPARK-2931
     sc = new SparkContext("local", "test")