Skip to content
Snippets Groups Projects
Commit 7eb9cbc2 authored by Thomas Graves's avatar Thomas Graves
Browse files

[SPARK-3072] YARN - Exit when reach max number failed executors

In some cases on hadoop 2.x the spark application master doesn't properly exit and hangs around for 10 minutes after its really done.  We should make sure it exits properly and stops the driver.

Author: Thomas Graves <tgraves@apache.org>

Closes #2022 from tgravescs/SPARK-3072 and squashes the following commits:

665701d [Thomas Graves] Exit when reach max number failed executors
parent cd0720ca
No related branches found
No related tags found
No related merge requests found
......@@ -267,12 +267,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
// Exists the loop if the user thread exits.
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
// Exits the loop if the user thread exits.
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
&& !isFinished) {
checkNumExecutorsFailed()
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
......@@ -303,11 +301,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val t = new Thread {
override def run() {
while (userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
while (userThread.isAlive && !isFinished) {
checkNumExecutorsFailed()
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
......@@ -327,6 +322,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
t
}
private def checkNumExecutorsFailed() {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
logInfo("max number of executor failures reached")
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
// make sure to stop the user thread
val sparkContext = ApplicationMaster.sparkContextRef.get()
if (sparkContext != null) {
logInfo("Invoking sc stop from checkNumExecutorsFailed")
sparkContext.stop()
} else {
logError("sparkContext is null when should shutdown")
}
}
}
private def sendProgress() {
logDebug("Sending progress")
// Simulated with an allocate request with no nodes requested ...
......
......@@ -249,7 +249,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
!isFinished) {
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
checkNumExecutorsFailed()
......@@ -271,7 +272,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val t = new Thread {
override def run() {
while (!driverClosed) {
while (!driverClosed && !isFinished) {
checkNumExecutorsFailed()
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
......
......@@ -247,13 +247,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
yarnAllocator.allocateResources()
// Exits the loop if the user thread exits.
var iters = 0
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive
&& !isFinished) {
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
iters += 1
}
}
logInfo("All executors have launched.")
......@@ -271,8 +270,17 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private def checkNumExecutorsFailed() {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
logInfo("max number of executor failures reached")
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
// make sure to stop the user thread
val sparkContext = ApplicationMaster.sparkContextRef.get()
if (sparkContext != null) {
logInfo("Invoking sc stop from checkNumExecutorsFailed")
sparkContext.stop()
} else {
logError("sparkContext is null when should shutdown")
}
}
}
......@@ -289,7 +297,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
val t = new Thread {
override def run() {
while (userThread.isAlive) {
while (userThread.isAlive && !isFinished) {
checkNumExecutorsFailed()
allocateMissingExecutor()
logDebug("Sending progress")
......
......@@ -217,7 +217,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// Wait until all containers have launched
yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed) &&
!isFinished) {
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
......@@ -249,7 +250,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val t = new Thread {
override def run() {
while (!driverClosed) {
while (!driverClosed && !isFinished) {
checkNumExecutorsFailed()
allocateMissingExecutor()
logDebug("Sending progress")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment