Skip to content
Snippets Groups Projects
Commit 9b234b55 authored by Liwei Lin's avatar Liwei Lin Committed by Reynold Xin
Browse files

[SPARK-15518][CORE][FOLLOW-UP] Rename LocalSchedulerBackendEndpoint -> LocalSchedulerBackend

## What changes were proposed in this pull request?

This patch is a follow-up to https://github.com/apache/spark/pull/13288 completing the renaming:
 - LocalScheduler -> LocalSchedulerBackend~~Endpoint~~

## How was this patch tested?

Updated test cases to reflect the name change.

Author: Liwei Lin <lwlin7@gmail.com>

Closes #13683 from lw-lin/rename-backend.
parent e1585cc7
No related branches found
No related tags found
No related merge requests found
......@@ -58,7 +58,7 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackendEndpoint
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
......@@ -2429,7 +2429,7 @@ object SparkContext extends Logging {
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackendEndpoint(sc.getConf, scheduler, 1)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
......@@ -2441,7 +2441,7 @@ object SparkContext extends Logging {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackendEndpoint(sc.getConf, scheduler, threadCount)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
......@@ -2451,7 +2451,7 @@ object SparkContext extends Logging {
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackendEndpoint(sc.getConf, scheduler, threadCount)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
......
......@@ -33,13 +33,13 @@ import org.apache.spark.TaskState.TaskState
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.scheduler.local.LocalSchedulerBackendEndpoint
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
* It can also work with a local setup by using a [[LocalSchedulerBackendEndpoint]] and setting
* It can also work with a local setup by using a [[LocalSchedulerBackend]] and setting
* isLocal to true. It handles common logic, like determining a scheduling order across jobs, waking
* up to launch speculative tasks, etc.
*
......
......@@ -39,15 +39,15 @@ private case class KillTask(taskId: Long, interruptThread: Boolean)
private case class StopExecutor()
/**
* Calls to [[LocalSchedulerBackendEndpoint]] are all serialized through LocalEndpoint. Using an
* RpcEndpoint makes the calls on [[LocalSchedulerBackendEndpoint]] asynchronous, which is necessary
* to prevent deadlock between [[LocalSchedulerBackendEndpoint]] and the [[TaskSchedulerImpl]].
* Calls to [[LocalSchedulerBackend]] are all serialized through LocalEndpoint. Using an
* RpcEndpoint makes the calls on [[LocalSchedulerBackend]] asynchronous, which is necessary
* to prevent deadlock between [[LocalSchedulerBackend]] and the [[TaskSchedulerImpl]].
*/
private[spark] class LocalEndpoint(
override val rpcEnv: RpcEnv,
userClassPath: Seq[URL],
scheduler: TaskSchedulerImpl,
executorBackend: LocalSchedulerBackendEndpoint,
executorBackend: LocalSchedulerBackend,
private val totalCores: Int)
extends ThreadSafeRpcEndpoint with Logging {
......@@ -93,9 +93,9 @@ private[spark] class LocalEndpoint(
/**
* Used when running a local version of Spark where the executor, backend, and master all run in
* the same JVM. It sits behind a [[TaskSchedulerImpl]] and handles launching tasks on a single
* Executor (created by the [[LocalSchedulerBackendEndpoint]]) running locally.
* Executor (created by the [[LocalSchedulerBackend]]) running locally.
*/
private[spark] class LocalSchedulerBackendEndpoint(
private[spark] class LocalSchedulerBackend(
conf: SparkConf,
scheduler: TaskSchedulerImpl,
val totalCores: Int)
......
......@@ -23,7 +23,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackendEndpoint
import org.apache.spark.scheduler.local.LocalSchedulerBackend
class SparkContextSchedulerCreationSuite
......@@ -58,7 +58,7 @@ class SparkContextSchedulerCreationSuite
test("local") {
val sched = createTaskScheduler("local")
sched.backend match {
case s: LocalSchedulerBackendEndpoint => assert(s.totalCores === 1)
case s: LocalSchedulerBackend => assert(s.totalCores === 1)
case _ => fail()
}
}
......@@ -66,7 +66,7 @@ class SparkContextSchedulerCreationSuite
test("local-*") {
val sched = createTaskScheduler("local[*]")
sched.backend match {
case s: LocalSchedulerBackendEndpoint =>
case s: LocalSchedulerBackend =>
assert(s.totalCores === Runtime.getRuntime.availableProcessors())
case _ => fail()
}
......@@ -76,7 +76,7 @@ class SparkContextSchedulerCreationSuite
val sched = createTaskScheduler("local[5]")
assert(sched.maxTaskFailures === 1)
sched.backend match {
case s: LocalSchedulerBackendEndpoint => assert(s.totalCores === 5)
case s: LocalSchedulerBackend => assert(s.totalCores === 5)
case _ => fail()
}
}
......@@ -85,7 +85,7 @@ class SparkContextSchedulerCreationSuite
val sched = createTaskScheduler("local[* ,2]")
assert(sched.maxTaskFailures === 2)
sched.backend match {
case s: LocalSchedulerBackendEndpoint =>
case s: LocalSchedulerBackend =>
assert(s.totalCores === Runtime.getRuntime.availableProcessors())
case _ => fail()
}
......@@ -95,7 +95,7 @@ class SparkContextSchedulerCreationSuite
val sched = createTaskScheduler("local[4, 2]")
assert(sched.maxTaskFailures === 2)
sched.backend match {
case s: LocalSchedulerBackendEndpoint => assert(s.totalCores === 4)
case s: LocalSchedulerBackend => assert(s.totalCores === 4)
case _ => fail()
}
}
......@@ -119,7 +119,7 @@ class SparkContextSchedulerCreationSuite
val sched = createTaskScheduler("local", "client", conf)
sched.backend match {
case s: LocalSchedulerBackendEndpoint => assert(s.defaultParallelism() === 16)
case s: LocalSchedulerBackend => assert(s.defaultParallelism() === 16)
case _ => fail()
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment