Skip to content
Snippets Groups Projects
Commit 4fb52f95 authored by Davies Liu's avatar Davies Liu Committed by Josh Rosen
Browse files

[SPARK-7624] Revert #4147

Author: Davies Liu <davies@databricks.com>

Closes #6172 from davies/revert_4147 and squashes the following commits:

3bfbbde [Davies Liu] Revert #4147
parent eb4632f2
No related branches found
No related tags found
No related merge requests found
......@@ -18,14 +18,12 @@
package org.apache.spark.scheduler.local
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.util.{ThreadUtils, Utils}
private case class ReviveOffers()
......@@ -47,9 +45,6 @@ private[spark] class LocalEndpoint(
private val totalCores: Int)
extends ThreadSafeRpcEndpoint with Logging {
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread")
private var freeCores = totalCores
private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
......@@ -79,27 +74,13 @@ private[spark] class LocalEndpoint(
context.reply(true)
}
def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
val tasks = scheduler.resourceOffers(offers).flatten
for (task <- tasks) {
for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
task.name, task.serializedTask)
}
if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
// Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
reviveThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}
}, 1000, TimeUnit.MILLISECONDS)
}
}
override def onStop(): Unit = {
reviveThread.shutdownNow()
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment