Skip to content
Snippets Groups Projects
Commit 16906ef2 authored by Ryan Williams's avatar Ryan Williams Committed by Andrew Or
Browse files

[SPARK-11120] Allow sane default number of executor failures when dynamically allocating in YARN

I also added some information to container-failure error msgs about what host they failed on, which would have helped me identify the problem that lead me to this JIRA and PR sooner.

Author: Ryan Williams <ryan.blake.williams@gmail.com>

Closes #9147 from ryan-williams/dyn-exec-failures.
parent fc26f32c
No related branches found
No related tags found
No related merge requests found
...@@ -595,7 +595,9 @@ private[spark] object SparkConf extends Logging { ...@@ -595,7 +595,9 @@ private[spark] object SparkConf extends Logging {
"spark.rpc.lookupTimeout" -> Seq( "spark.rpc.lookupTimeout" -> Seq(
AlternateConfig("spark.akka.lookupTimeout", "1.4")), AlternateConfig("spark.akka.lookupTimeout", "1.4")),
"spark.streaming.fileStream.minRememberDuration" -> Seq( "spark.streaming.fileStream.minRememberDuration" -> Seq(
AlternateConfig("spark.streaming.minRememberDuration", "1.5")) AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
"spark.yarn.max.executor.failures" -> Seq(
AlternateConfig("spark.yarn.max.worker.failures", "1.5"))
) )
/** /**
......
...@@ -62,10 +62,21 @@ private[spark] class ApplicationMaster( ...@@ -62,10 +62,21 @@ private[spark] class ApplicationMaster(
.asInstanceOf[YarnConfiguration] .asInstanceOf[YarnConfiguration]
private val isClusterMode = args.userClass != null private val isClusterMode = args.userClass != null
// Default to numExecutors * 2, with minimum of 3 // Default to twice the number of executors (twice the maximum number of executors if dynamic
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", // allocation is enabled), with a minimum of 3.
sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(sparkConf.getInt("spark.executor.instances", 0) * 2, 3))) private val maxNumExecutorFailures = {
val defaultKey =
if (Utils.isDynamicAllocationEnabled(sparkConf)) {
"spark.dynamicAllocation.maxExecutors"
} else {
"spark.executor.instances"
}
val effectiveNumExecutors = sparkConf.getInt(defaultKey, 0)
val defaultMaxNumExecutorFailures = math.max(3, 2 * effectiveNumExecutors)
sparkConf.getInt("spark.yarn.max.executor.failures", defaultMaxNumExecutorFailures)
}
@volatile private var exitCode = 0 @volatile private var exitCode = 0
@volatile private var unregistered = false @volatile private var unregistered = false
......
...@@ -430,17 +430,20 @@ private[yarn] class YarnAllocator( ...@@ -430,17 +430,20 @@ private[yarn] class YarnAllocator(
for (completedContainer <- completedContainers) { for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId val containerId = completedContainer.getContainerId
val alreadyReleased = releasedContainers.remove(containerId) val alreadyReleased = releasedContainers.remove(containerId)
val hostOpt = allocatedContainerToHostMap.get(containerId)
val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")
val exitReason = if (!alreadyReleased) { val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of // Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating. // the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning -= 1 numExecutorsRunning -= 1
logInfo("Completed container %s (state: %s, exit status: %s)".format( logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
containerId, containerId,
onHostStr,
completedContainer.getState, completedContainer.getState,
completedContainer.getExitStatus)) completedContainer.getExitStatus))
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
// there are some exit status' we shouldn't necessarily count against us, but for // there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit // now I think its ok as none of the containers are expected to exit.
val exitStatus = completedContainer.getExitStatus val exitStatus = completedContainer.getExitStatus
val (isNormalExit, containerExitReason) = exitStatus match { val (isNormalExit, containerExitReason) = exitStatus match {
case ContainerExitStatus.SUCCESS => case ContainerExitStatus.SUCCESS =>
...@@ -449,7 +452,7 @@ private[yarn] class YarnAllocator( ...@@ -449,7 +452,7 @@ private[yarn] class YarnAllocator(
// Preemption should count as a normal exit, since YARN preempts containers merely // Preemption should count as a normal exit, since YARN preempts containers merely
// to do resource sharing, and tasks that fail due to preempted executors could // to do resource sharing, and tasks that fail due to preempted executors could
// just as easily finish on any other executor. See SPARK-8167. // just as easily finish on any other executor. See SPARK-8167.
(true, s"Container $containerId was preempted.") (true, s"Container ${containerId}${onHostStr} was preempted.")
// Should probably still count memory exceeded exit codes towards task failures // Should probably still count memory exceeded exit codes towards task failures
case VMEM_EXCEEDED_EXIT_CODE => case VMEM_EXCEEDED_EXIT_CODE =>
(false, memLimitExceededLogMessage( (false, memLimitExceededLogMessage(
...@@ -461,7 +464,7 @@ private[yarn] class YarnAllocator( ...@@ -461,7 +464,7 @@ private[yarn] class YarnAllocator(
PMEM_EXCEEDED_PATTERN)) PMEM_EXCEEDED_PATTERN))
case unknown => case unknown =>
numExecutorsFailed += 1 numExecutorsFailed += 1
(false, "Container marked as failed: " + containerId + (false, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus + ". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics) ". Diagnostics: " + completedContainer.getDiagnostics)
...@@ -479,10 +482,10 @@ private[yarn] class YarnAllocator( ...@@ -479,10 +482,10 @@ private[yarn] class YarnAllocator(
s"Container $containerId exited from explicit termination request.") s"Container $containerId exited from explicit termination request.")
} }
if (allocatedContainerToHostMap.contains(containerId)) { for {
val host = allocatedContainerToHostMap.get(containerId).get host <- hostOpt
val containerSet = allocatedHostToContainersMap.get(host).get containerSet <- allocatedHostToContainersMap.get(host)
} {
containerSet.remove(containerId) containerSet.remove(containerId)
if (containerSet.isEmpty) { if (containerSet.isEmpty) {
allocatedHostToContainersMap.remove(host) allocatedHostToContainersMap.remove(host)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment