Skip to content
Snippets Groups Projects
Commit 9966d1a8 authored by Dale's avatar Dale Committed by Reynold Xin
Browse files

SPARK-CORE [SPARK-3651] Group common CoarseGrainedSchedulerBackend variables together

from [SPARK-3651]
In CoarseGrainedSchedulerBackend, we have:

    private val executorActor = new HashMap[String, ActorRef]
    private val executorAddress = new HashMap[String, Address]
    private val executorHost = new HashMap[String, String]
    private val freeCores = new HashMap[String, Int]
    private val totalCores = new HashMap[String, Int]

We only ever put / remove stuff from these maps together. It would simplify the code if we consolidate these all into one map as we have done in JobProgressListener in https://issues.apache.org/jira/browse/SPARK-2299.

Author: Dale <tigerquoll@outlook.com>

Closes #2533 from tigerquoll/SPARK-3651 and squashes the following commits:

d1be0a9 [Dale] [SPARK-3651]  implemented suggested changes. Changed a reference from executorInfo to executorData to be consistent with other usages
6890663 [Dale] [SPARK-3651]  implemented suggested changes
7d671cf [Dale] [SPARK-3651]  Grouped variables under a ExecutorDataObject, and reference them via a map entry as they are all retrieved under the same key
parent 24823293
No related branches found
No related tags found
No related merge requests found
......@@ -62,15 +62,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
val createTime = System.currentTimeMillis()
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {
override protected def log = CoarseGrainedSchedulerBackend.this.log
private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
private val freeCores = new HashMap[String, Int]
private val totalCores = new HashMap[String, Int]
private val addressToExecutorId = new HashMap[Address, String]
private val executorDataMap = new HashMap[String, ExecutorData]
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
......@@ -85,16 +79,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
def receiveWithLogging = {
case RegisterExecutor(executorId, hostPort, cores) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorActor.contains(executorId)) {
if (executorDataMap.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
} else {
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor
executorActor(executorId) = sender
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
totalCores(executorId) = cores
freeCores(executorId) = cores
executorAddress(executorId) = sender.path.address
executorDataMap.put(executorId, new ExecutorData(sender, sender.path.address,
Utils.parseHostPort(hostPort)._1, cores, cores))
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
......@@ -104,13 +96,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
if (executorActor.contains(executorId)) {
freeCores(executorId) += scheduler.CPUS_PER_TASK
makeOffers(executorId)
} else {
// Ignoring the update since we don't know about the executor.
val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
logWarning(msg.format(taskId, state, sender, executorId))
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
"from unknown executor $sender with ID $executorId")
}
}
......@@ -118,7 +111,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
makeOffers()
case KillTask(taskId, executorId, interruptThread) =>
executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)
executorDataMap(executorId).executorActor ! KillTask(taskId, executorId, interruptThread)
case StopDriver =>
sender ! true
......@@ -126,8 +119,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
case StopExecutors =>
logInfo("Asking each executor to shut down")
for (executor <- executorActor.values) {
executor ! StopExecutor
for ((_, executorData) <- executorDataMap) {
executorData.executorActor ! StopExecutor
}
sender ! true
......@@ -138,6 +131,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
sender ! true
case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_,
"remote Akka client disassociated"))
......@@ -149,13 +143,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(
executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
executorDataMap.map {case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeq))
}
// Make fake resource offers on just one executor
def makeOffers(executorId: String) {
val executorData = executorDataMap(executorId)
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
}
// Launch tasks returned by a set of resource offers
......@@ -179,25 +175,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
}
}
else {
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
}
}
}
// Remove a disconnected slave from the cluster
def removeExecutor(executorId: String, reason: String) {
if (executorActor.contains(executorId)) {
logInfo("Executor " + executorId + " disconnected, so removing it")
val numCores = totalCores(executorId)
executorActor -= executorId
executorHost -= executorId
addressToExecutorId -= executorAddress(executorId)
executorAddress -= executorId
totalCores -= executorId
freeCores -= executorId
totalCoreCount.addAndGet(-numCores)
scheduler.executorLost(executorId, SlaveLost(reason))
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorDataMap -= executorId
totalCoreCount.addAndGet(-executorInfo.totalCores)
scheduler.executorLost(executorId, SlaveLost(reason))
case None => logError(s"Asked to remove non existant executor $executorId")
}
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster
import akka.actor.{Address, ActorRef}
/**
* Grouping of data that is accessed by a CourseGrainedScheduler. This class
* is stored in a Map keyed by an executorID
*
* @param executorActor The actorRef representing this executor
* @param executorAddress The network address of this executor
* @param executorHost The hostname that this executor is running on
* @param freeCores The current number of cores available for work on the executor
* @param totalCores The total number of cores available to the executor
*/
private[cluster] class ExecutorData(
val executorActor: ActorRef,
val executorAddress: Address,
val executorHost: String ,
var freeCores: Int,
val totalCores: Int
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment