Skip to content
Snippets Groups Projects
Commit ff6e4cbd authored by Kishor Patil's avatar Kishor Patil Committed by Tom Graves
Browse files

[SPARK-17511] Yarn Dynamic Allocation: Avoid marking released container as Failed

## What changes were proposed in this pull request?

Due to race conditions, the ` assert(numExecutorsRunning <= targetNumExecutors)` can fail causing `AssertionError`. So removed the assertion, instead moved the conditional check before launching new container:
```
java.lang.AssertionError: assertion failed
        at scala.Predef$.assert(Predef.scala:156)
        at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$runAllocatedContainers$1.org$apache$spark$deploy$yarn$YarnAllocator$$anonfun$$updateInternalState$1(YarnAllocator.scala:489)
        at org.apache.spark.deploy.yarn.YarnAllocator$$anonfun$runAllocatedContainers$1$$anon$1.run(YarnAllocator.scala:519)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
```
## How was this patch tested?
This was manually tested using a large ForkAndJoin job with Dynamic Allocation enabled to validate the failing job succeeds, without any such exception.

Author: Kishor Patil <kpatil@yahoo-inc.com>

Closes #15069 from kishorvpatil/SPARK-17511.
parent 040e4697
No related branches found
No related tags found
No related merge requests found
......@@ -496,7 +496,6 @@ private[yarn] class YarnAllocator(
def updateInternalState(): Unit = synchronized {
numExecutorsRunning += 1
assert(numExecutorsRunning <= targetNumExecutors)
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId
......@@ -506,36 +505,41 @@ private[yarn] class YarnAllocator(
allocatedContainerToHostMap.put(containerId, executorHostname)
}
if (launchContainers) {
launcherPool.execute(new Runnable {
override def run(): Unit = {
try {
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run()
updateInternalState()
} catch {
case NonFatal(e) =>
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately to avoid unnecessary resource
// occupation.
amClient.releaseAssignedContainer(containerId)
if (numExecutorsRunning < targetNumExecutors) {
if (launchContainers) {
launcherPool.execute(new Runnable {
override def run(): Unit = {
try {
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run()
updateInternalState()
} catch {
case NonFatal(e) =>
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately to avoid unnecessary resource
// occupation.
amClient.releaseAssignedContainer(containerId)
}
}
}
})
})
} else {
// For test only
updateInternalState()
}
} else {
// For test only
updateInternalState()
logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
"reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors))
}
}
}
......
......@@ -136,6 +136,25 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
size should be (0)
}
test("container should not be created if requested number if met") {
// request a single container and receive it
val handler = createAllocator(1)
handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getPendingAllocate.size should be (1)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
val container2 = createContainer("host2")
handler.handleAllocatedContainers(Array(container2))
handler.getNumExecutorsRunning should be (1)
}
test("some containers allocated") {
// request a few containers and receive some of them
val handler = createAllocator(4)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment