From 83de71c45bb9f22049243dd7518b679c4e13c2df Mon Sep 17 00:00:00 2001
From: Davies Liu <davies@databricks.com>
Date: Tue, 3 Feb 2015 22:30:23 -0800
Subject: [PATCH] [SPARK-4939] revive offers periodically in LocalBackend

The locality timeout assume that the SchedulerBackend can revive offers periodically, but currently LocalBackend did do that, then some job with mixed locality levels in local mode will hang forever.

This PR let LocalBackend revive offers periodically, just like in cluster mode.

Author: Davies Liu <davies@databricks.com>

Closes #4147 from davies/revive and squashes the following commits:

2acdf9d [Davies Liu] Update LocalBackend.scala
3c8ca7c [Davies Liu] Update LocalBackend.scala
d1b60d2 [Davies Liu] address comments from Kay
33ac9bb [Davies Liu] fix build
d0da0d5 [Davies Liu] Merge branch 'master' of github.com:apache/spark into revive
6cf5972 [Davies Liu] fix thread-safety
ed62a31 [Davies Liu] fix scala style
df9008b [Davies Liu] fix typo
bfc1396 [Davies Liu] revive offers periodically in LocalBackend
---
 .../apache/spark/scheduler/local/LocalBackend.scala   | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 05b6fa5456..4676b828d3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler.local
 
 import java.nio.ByteBuffer
 
+import scala.concurrent.duration._
+
 import akka.actor.{Actor, ActorRef, Props}
 
 import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
@@ -46,6 +48,8 @@ private[spark] class LocalActor(
     private val totalCores: Int)
   extends Actor with ActorLogReceive with Logging {
 
+  import context.dispatcher   // to use Akka's scheduler.scheduleOnce()
+
   private var freeCores = totalCores
 
   private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
@@ -74,11 +78,16 @@ private[spark] class LocalActor(
 
   def reviveOffers() {
     val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
-    for (task <- scheduler.resourceOffers(offers).flatten) {
+    val tasks = scheduler.resourceOffers(offers).flatten
+    for (task <- tasks) {
       freeCores -= scheduler.CPUS_PER_TASK
       executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
         task.name, task.serializedTask)
     }
+    if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
+      // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
+      context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
+    }
   }
 }
 
-- 
GitLab