diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index fcd76ec52742a2809d0044a47970f758321d5ceb..49059de50b42b770bc0eea1308742985c457c87e 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -110,6 +110,11 @@ private[spark] class CoarseGrainedExecutorBackend( case StopExecutor => logInfo("Driver commanded a shutdown") + // Cannot shutdown here because an ack may need to be sent back to the caller. So send + // a message to self to actually do the shutdown. + self.send(Shutdown) + + case Shutdown => executor.stop() stop() rpcEnv.shutdown() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index d94743677783f6d14fa98b7e0e81c6ab6f6b1cae..e0d25dc50c988d7396fcdc7247bd495a3f67ed54 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -100,4 +100,11 @@ private[spark] object CoarseGrainedClusterMessages { case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage + // Used internally by executors to shut themselves down. + case object Shutdown extends CoarseGrainedClusterMessage + + // SPARK-10987: workaround for netty RPC issue; forces a connection from the driver back + // to the AM. + case object DriverHello extends CoarseGrainedClusterMessage + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index e0107f9d3dd19fb5cfbf0cf0a964f80cea68b521..38218b9c08fd863cbca8d40ce766d4bd590866a0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -170,6 +170,8 @@ private[spark] abstract class YarnSchedulerBackend( case RegisterClusterManager(am) => logInfo(s"ApplicationMaster registered as $am") amEndpoint = Option(am) + // See SPARK-10987. + am.send(DriverHello) case AddWebUIFilter(filterName, filterParams, proxyBase) => addWebUIFilter(filterName, filterParams, proxyBase) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index a2ccdc05d73c888894b2181a9da3f34c508ad22c..3791eea5bf178a967d7f0047da841e90ac0ec78f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -564,6 +564,9 @@ private[spark] class ApplicationMaster( case x: AddWebUIFilter => logInfo(s"Add WebUI Filter. $x") driver.send(x) + + case DriverHello => + // SPARK-10987: no action needed for this message. } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {