Skip to content
Snippets Groups Projects
Commit e5353465 authored by Mridul Muralidharan's avatar Mridul Muralidharan Committed by Marcelo Vanzin
Browse files

[SPARK-8297] [YARN] Scheduler backend is not notified in case node fails in YARN

This change adds code to notify the scheduler backend when a container dies in YARN.

Author: Mridul Muralidharan <mridulm@yahoo-inc.com>
Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #7431 from vanzin/SPARK-8297 and squashes the following commits:

471e4a0 [Marcelo Vanzin] Fix unit test after merge.
d4adf4e [Marcelo Vanzin] Merge branch 'master' into SPARK-8297
3b262e8 [Marcelo Vanzin] Merge branch 'master' into SPARK-8297
537da6f [Marcelo Vanzin] Make an expected log less scary.
04dc112 [Marcelo Vanzin] Use driver <-> AM communication to send "remove executor" request.
8855b97 [Marcelo Vanzin] Merge remote-tracking branch 'mridul/fix_yarn_scheduler_bug' into SPARK-8297
687790f [Mridul Muralidharan] Merge branch 'fix_yarn_scheduler_bug' of github.com:mridulm/spark into fix_yarn_scheduler_bug
e1b0067 [Mridul Muralidharan] Fix failing testcase, fix merge issue from our 1.3 -> master
9218fcc [Mridul Muralidharan] Fix failing testcase
362d64a [Mridul Muralidharan] Merge branch 'fix_yarn_scheduler_bug' of github.com:mridulm/spark into fix_yarn_scheduler_bug
62ad0cc [Mridul Muralidharan] Merge branch 'fix_yarn_scheduler_bug' of github.com:mridulm/spark into fix_yarn_scheduler_bug
bbf8811 [Mridul Muralidharan] Merge branch 'fix_yarn_scheduler_bug' of github.com:mridulm/spark into fix_yarn_scheduler_bug
9ee1307 [Mridul Muralidharan] Fix SPARK-8297
a3a0f01 [Mridul Muralidharan] Fix SPARK-8297
parent 5363ed71
No related branches found
No related tags found
No related merge requests found
......@@ -241,7 +241,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
scheduler.executorLost(executorId, SlaveLost(reason))
listenerBus.post(
SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason))
case None => logError(s"Asked to remove non-existent executor $executorId")
case None => logInfo(s"Asked to remove non-existent executor $executorId")
}
}
......
......@@ -109,6 +109,8 @@ private[spark] abstract class YarnSchedulerBackend(
case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
case RemoveExecutor(executorId, reason) =>
removeExecutor(executorId, reason)
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
......
......@@ -229,7 +229,11 @@ private[spark] class ApplicationMaster(
sparkContextRef.compareAndSet(sc, null)
}
private def registerAM(_rpcEnv: RpcEnv, uiAddress: String, securityMgr: SecurityManager) = {
private def registerAM(
_rpcEnv: RpcEnv,
driverRef: RpcEndpointRef,
uiAddress: String,
securityMgr: SecurityManager) = {
val sc = sparkContextRef.get()
val appId = client.getAttemptId().getApplicationId().toString()
......@@ -246,6 +250,7 @@ private[spark] class ApplicationMaster(
RpcAddress(_sparkConf.get("spark.driver.host"), _sparkConf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
allocator = client.register(driverUrl,
driverRef,
yarnConf,
_sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(),
......@@ -262,17 +267,20 @@ private[spark] class ApplicationMaster(
*
* In cluster mode, the AM and the driver belong to same process
* so the AMEndpoint need not monitor lifecycle of the driver.
*
* @return A reference to the driver's RPC endpoint.
*/
private def runAMEndpoint(
host: String,
port: String,
isClusterMode: Boolean): Unit = {
isClusterMode: Boolean): RpcEndpointRef = {
val driverEndpoint = rpcEnv.setupEndpointRef(
SparkEnv.driverActorSystemName,
RpcAddress(host, port.toInt),
YarnSchedulerBackend.ENDPOINT_NAME)
amEndpoint =
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
driverEndpoint
}
private def runDriver(securityMgr: SecurityManager): Unit = {
......@@ -290,11 +298,11 @@ private[spark] class ApplicationMaster(
"Timed out waiting for SparkContext.")
} else {
rpcEnv = sc.env.rpcEnv
runAMEndpoint(
val driverRef = runAMEndpoint(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
registerAM(rpcEnv, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
registerAM(rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join()
}
}
......@@ -302,9 +310,9 @@ private[spark] class ApplicationMaster(
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
val port = sparkConf.getInt("spark.yarn.am.port", 0)
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr)
waitForSparkDriver()
val driverRef = waitForSparkDriver()
addAmIpFilter()
registerAM(rpcEnv, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
registerAM(rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
// In client mode the actor will stop the reporter thread.
reporterThread.join()
......@@ -428,7 +436,7 @@ private[spark] class ApplicationMaster(
}
}
private def waitForSparkDriver(): Unit = {
private def waitForSparkDriver(): RpcEndpointRef = {
logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false
val hostport = args.userArgs(0)
......
......@@ -36,6 +36,9 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
......@@ -52,6 +55,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
*/
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: Configuration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
......@@ -88,6 +92,9 @@ private[yarn] class YarnAllocator(
// Visible for testing.
private[yarn] val executorIdToContainer = new HashMap[String, Container]
private var numUnexpectedContainerRelease = 0L
private val containerIdToExecutorId = new HashMap[ContainerId, String]
// Executor memory in MB.
protected val executorMemory = args.executorMemory
// Additional memory overhead.
......@@ -184,6 +191,7 @@ private[yarn] class YarnAllocator(
def killExecutor(executorId: String): Unit = synchronized {
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.remove(executorId).get
containerIdToExecutorId.remove(container.getId)
internalReleaseContainer(container)
numExecutorsRunning -= 1
} else {
......@@ -383,6 +391,7 @@ private[yarn] class YarnAllocator(
logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
......@@ -413,12 +422,8 @@ private[yarn] class YarnAllocator(
private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId
if (releasedContainers.contains(containerId)) {
// Already marked the container for release, so remove it from
// `releasedContainers`.
releasedContainers.remove(containerId)
} else {
val alreadyReleased = releasedContainers.remove(containerId)
if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning -= 1
......@@ -460,6 +465,18 @@ private[yarn] class YarnAllocator(
allocatedContainerToHostMap.remove(containerId)
}
containerIdToExecutorId.remove(containerId).foreach { eid =>
executorIdToContainer.remove(eid)
if (!alreadyReleased) {
// The executor could have gone away (like no route to host, node failure, etc)
// Notify backend about the failure of the executor
numUnexpectedContainerRelease += 1
driverRef.send(RemoveExecutor(eid,
s"Yarn deallocated the executor $eid (container $containerId)"))
}
}
}
}
......@@ -467,6 +484,9 @@ private[yarn] class YarnAllocator(
releasedContainers.add(container.getId())
amClient.releaseAssignedContainer(container.getId())
}
private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
}
private object YarnAllocator {
......
......@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.webapp.util.WebAppUtils
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.util.Utils
......@@ -56,6 +57,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
*/
def register(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
......@@ -73,7 +75,8 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
new YarnAllocator(driverUrl, conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), args,
securityMgr)
}
/**
......
......@@ -27,10 +27,14 @@ import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.mockito.Mockito._
import org.apache.spark.{SecurityManager, SparkFunSuite}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo
class MockResolver extends DNSToSwitchMapping {
......@@ -90,6 +94,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
"--class", "SomeClass")
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
conf,
sparkConf,
rmClient,
......@@ -230,6 +235,30 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
handler.getNumPendingAllocate should be (1)
}
test("lost executor removed from backend") {
val handler = createAllocator(4)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2))
handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map())
val statuses = Seq(container1, container2).map { c =>
ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
}
handler.updateResourceRequests()
handler.processCompletedContainers(statuses.toSeq)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (2)
handler.getNumExecutorsFailed should be (2)
handler.getNumUnexpectedContainerRelease should be (2)
}
test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment