diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 7f2192e1f5a70fc0e004558ec4d3431ae4013177..7d7b4c82fa3923d1016380e3bf0e1d3a68174bf6 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -287,15 +287,15 @@ private[netty] class NettyRpcEnv( if (timeoutScheduler != null) { timeoutScheduler.shutdownNow() } + if (dispatcher != null) { + dispatcher.stop() + } if (server != null) { server.close() } if (clientFactory != null) { clientFactory.close() } - if (dispatcher != null) { - dispatcher.stop() - } if (clientConnectionExecutor != null) { clientConnectionExecutor.shutdownNow() } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala index 56499c639f292273867cf7a7382fff26b50efc1f..6c090ada5ae9d0191f0093d53e782f0182c06760 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala @@ -241,10 +241,7 @@ private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) { } private def closeClient(): Unit = synchronized { - // Not sure if `client.close` is idempotent. Just for safety. - if (client != null) { - client.close() - } + // Just set client to null. Don't close it in order to reuse the connection. client = null } diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 73803ec21a5678db0e3b4d84c9ebb9311ab3181c..505cd476ff881df9a7a148e3c659d4b72dedd667 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -29,7 +29,8 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.Files -import org.mockito.Mockito.{mock, when} +import org.mockito.Matchers.any +import org.mockito.Mockito.{mock, never, verify, when} import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually._ @@ -844,6 +845,31 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } } + test("SPARK-14699: RpcEnv.shutdown should not fire onDisconnected events") { + env.setupEndpoint("SPARK-14699", new RpcEndpoint { + override val rpcEnv: RpcEnv = env + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case m => context.reply(m) + } + }) + + val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0) + val endpoint = mock(classOf[RpcEndpoint]) + anotherEnv.setupEndpoint("SPARK-14699", endpoint) + + val ref = anotherEnv.setupEndpointRef(env.address, "SPARK-14699") + // Make sure the connect is set up + assert(ref.askWithRetry[String]("hello") === "hello") + anotherEnv.shutdown() + anotherEnv.awaitTermination() + + env.stop(ref) + + verify(endpoint).onStop() + verify(endpoint, never()).onDisconnected(any()) + verify(endpoint, never()).onNetworkError(any(), any()) + } } class UnserializableClass