diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 391b97d73e0260501c434a9960899ab63d1b6d54..7eec4ae64f29699d782e499887ec33772384c723 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging import org.apache.spark.rpc._ -import org.apache.spark.scheduler.TaskDescription +import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{ThreadUtils, Utils} @@ -65,7 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend( case Success(msg) => // Always receive `true`. Just ignore it case Failure(e) => - exitExecutor(1, s"Cannot register with driver: $driverUrl", e) + exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread) } @@ -129,7 +129,8 @@ private[spark] class CoarseGrainedExecutorBackend( if (stopping.get()) { logInfo(s"Driver from $remoteAddress disconnected during shutdown") } else if (driver.exists(_.address == remoteAddress)) { - exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.") + exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null, + notifyDriver = false) } else { logWarning(s"An unknown ($remoteAddress) driver disconnected.") } @@ -148,12 +149,25 @@ private[spark] class CoarseGrainedExecutorBackend( * executor exits differently. For e.g. when an executor goes down, * back-end may not want to take the parent process down. */ - protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = { + protected def exitExecutor(code: Int, + reason: String, + throwable: Throwable = null, + notifyDriver: Boolean = true) = { + val message = "Executor self-exiting due to : " + reason if (throwable != null) { - logError(reason, throwable) + logError(message, throwable) } else { - logError(reason) + logError(message) } + + if (notifyDriver && driver.nonEmpty) { + driver.get.ask[Boolean]( + RemoveExecutor(executorId, new ExecutorLossReason(reason)) + ).onFailure { case e => + logWarning(s"Unable to notify the driver due to " + e.getMessage, e) + }(ThreadUtils.sameThread) + } + System.exit(code) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a724fdf009789bc9f8939b6965767f81c6d314ce..c172ac2cdc0e30a4118ff61c03f686c65304f182 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -199,6 +199,9 @@ private[spark] class BlockManager( logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}" + s" more times after waiting $SLEEP_TIME_SECS seconds...", e) Thread.sleep(SLEEP_TIME_SECS * 1000) + case NonFatal(e) => + throw new SparkException("Unable to register with external shuffle server due to : " + + e.getMessage, e) } } }