Skip to content
Snippets Groups Projects
Commit 83de71c4 authored by Davies Liu's avatar Davies Liu Committed by Kay Ousterhout
Browse files

[SPARK-4939] revive offers periodically in LocalBackend

The locality timeout assume that the SchedulerBackend can revive offers periodically, but currently LocalBackend did do that, then some job with mixed locality levels in local mode will hang forever.

This PR let LocalBackend revive offers periodically, just like in cluster mode.

Author: Davies Liu <davies@databricks.com>

Closes #4147 from davies/revive and squashes the following commits:

2acdf9d [Davies Liu] Update LocalBackend.scala
3c8ca7c [Davies Liu] Update LocalBackend.scala
d1b60d2 [Davies Liu] address comments from Kay
33ac9bb [Davies Liu] fix build
d0da0d5 [Davies Liu] Merge branch 'master' of github.com:apache/spark into revive
6cf5972 [Davies Liu] fix thread-safety
ed62a31 [Davies Liu] fix scala style
df9008b [Davies Liu] fix typo
bfc1396 [Davies Liu] revive offers periodically in LocalBackend
parent 242b4f02
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,8 @@ package org.apache.spark.scheduler.local
import java.nio.ByteBuffer
import scala.concurrent.duration._
import akka.actor.{Actor, ActorRef, Props}
import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
......@@ -46,6 +48,8 @@ private[spark] class LocalActor(
private val totalCores: Int)
extends Actor with ActorLogReceive with Logging {
import context.dispatcher // to use Akka's scheduler.scheduleOnce()
private var freeCores = totalCores
private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
......@@ -74,11 +78,16 @@ private[spark] class LocalActor(
def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
val tasks = scheduler.resourceOffers(offers).flatten
for (task <- tasks) {
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
task.name, task.serializedTask)
}
if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
// Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment