Skip to content
Snippets Groups Projects
Commit b4792781 authored by Tejas Patil's avatar Tejas Patil Committed by Shixiong Zhu
Browse files

[SPARK-17451][CORE] CoarseGrainedExecutorBackend should inform driver before self-kill

## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-17451

`CoarseGrainedExecutorBackend` in some failure cases exits the JVM. While this does not have any issue, from the driver UI there is no specific reason captured for this. In this PR, I am adding functionality to `exitExecutor` to notify driver that the executor is exiting.

## How was this patch tested?

Ran the change over a test env and took down shuffle service before the executor could register to it. In the driver logs, where the job failure reason is mentioned (ie. `Job aborted due to stage ...` it gives the correct reason:

Before:
`ExecutorLostFailure (executor ZZZZZZZZZ exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.`

After:
`ExecutorLostFailure (executor ZZZZZZZZZ exited caused by one of the running tasks) Reason: Unable to create executor due to java.util.concurrent.TimeoutException: Timeout waiting for task.`

Author: Tejas Patil <tejasp@fb.com>

Closes #15013 from tejasapatil/SPARK-17451_inform_driver.
parent 2ad27695
No related branches found
No related tags found
No related merge requests found
...@@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil ...@@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.rpc._ import org.apache.spark.rpc._
import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.{ThreadUtils, Utils}
...@@ -65,7 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend( ...@@ -65,7 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case Success(msg) => case Success(msg) =>
// Always receive `true`. Just ignore it // Always receive `true`. Just ignore it
case Failure(e) => case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e) exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread) }(ThreadUtils.sameThread)
} }
...@@ -129,7 +129,8 @@ private[spark] class CoarseGrainedExecutorBackend( ...@@ -129,7 +129,8 @@ private[spark] class CoarseGrainedExecutorBackend(
if (stopping.get()) { if (stopping.get()) {
logInfo(s"Driver from $remoteAddress disconnected during shutdown") logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) { } else if (driver.exists(_.address == remoteAddress)) {
exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.") exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null,
notifyDriver = false)
} else { } else {
logWarning(s"An unknown ($remoteAddress) driver disconnected.") logWarning(s"An unknown ($remoteAddress) driver disconnected.")
} }
...@@ -148,12 +149,25 @@ private[spark] class CoarseGrainedExecutorBackend( ...@@ -148,12 +149,25 @@ private[spark] class CoarseGrainedExecutorBackend(
* executor exits differently. For e.g. when an executor goes down, * executor exits differently. For e.g. when an executor goes down,
* back-end may not want to take the parent process down. * back-end may not want to take the parent process down.
*/ */
protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = { protected def exitExecutor(code: Int,
reason: String,
throwable: Throwable = null,
notifyDriver: Boolean = true) = {
val message = "Executor self-exiting due to : " + reason
if (throwable != null) { if (throwable != null) {
logError(reason, throwable) logError(message, throwable)
} else { } else {
logError(reason) logError(message)
} }
if (notifyDriver && driver.nonEmpty) {
driver.get.ask[Boolean](
RemoveExecutor(executorId, new ExecutorLossReason(reason))
).onFailure { case e =>
logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
}(ThreadUtils.sameThread)
}
System.exit(code) System.exit(code)
} }
} }
......
...@@ -199,6 +199,9 @@ private[spark] class BlockManager( ...@@ -199,6 +199,9 @@ private[spark] class BlockManager(
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}" logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e) + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000) Thread.sleep(SLEEP_TIME_SECS * 1000)
case NonFatal(e) =>
throw new SparkException("Unable to register with external shuffle server due to : " +
e.getMessage, e)
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment