Skip to content
Snippets Groups Projects
Commit 8af2f8c6 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-12267][CORE] Store the remote RpcEnv address to send the correct disconnetion message

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #10261 from zsxwing/SPARK-12267.
parent 98b212d3
No related branches found
No related tags found
No related merge requests found
...@@ -66,6 +66,7 @@ private[spark] class ApplicationInfo( ...@@ -66,6 +66,7 @@ private[spark] class ApplicationInfo(
nextExecutorId = 0 nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorDesc] removedExecutors = new ArrayBuffer[ExecutorDesc]
executorLimit = Integer.MAX_VALUE executorLimit = Integer.MAX_VALUE
appUIUrlAtHistoryServer = None
} }
private def newExecutorId(useID: Option[Int] = None): Int = { private def newExecutorId(useID: Option[Int] = None): Int = {
......
...@@ -690,7 +690,7 @@ private[deploy] object Worker extends Logging { ...@@ -690,7 +690,7 @@ private[deploy] object Worker extends Logging {
val conf = new SparkConf val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf) val args = new WorkerArguments(argStrings, conf)
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir) args.memory, args.masters, args.workDir, conf = conf)
rpcEnv.awaitTermination() rpcEnv.awaitTermination()
} }
......
...@@ -553,6 +553,9 @@ private[netty] class NettyRpcHandler( ...@@ -553,6 +553,9 @@ private[netty] class NettyRpcHandler(
// A variable to track whether we should dispatch the RemoteProcessConnected message. // A variable to track whether we should dispatch the RemoteProcessConnected message.
private val clients = new ConcurrentHashMap[TransportClient, JBoolean]() private val clients = new ConcurrentHashMap[TransportClient, JBoolean]()
// A variable to track the remote RpcEnv addresses of all clients
private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()
override def receive( override def receive(
client: TransportClient, client: TransportClient,
message: ByteBuffer, message: ByteBuffer,
...@@ -580,6 +583,12 @@ private[netty] class NettyRpcHandler( ...@@ -580,6 +583,12 @@ private[netty] class NettyRpcHandler(
// Create a new message with the socket address of the client as the sender. // Create a new message with the socket address of the client as the sender.
RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content) RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content)
} else { } else {
// The remote RpcEnv listens to some port, we should also fire a RemoteProcessConnected for
// the listening address
val remoteEnvAddress = requestMessage.senderAddress
if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) {
dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress))
}
requestMessage requestMessage
} }
} }
...@@ -591,6 +600,12 @@ private[netty] class NettyRpcHandler( ...@@ -591,6 +600,12 @@ private[netty] class NettyRpcHandler(
if (addr != null) { if (addr != null) {
val clientAddr = RpcAddress(addr.getHostName, addr.getPort) val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr)) dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
// If the remove RpcEnv listens to some address, we should also fire a
// RemoteProcessConnectionError for the remote RpcEnv listening address
val remoteEnvAddress = remoteAddresses.get(clientAddr)
if (remoteEnvAddress != null) {
dispatcher.postToAll(RemoteProcessConnectionError(cause, remoteEnvAddress))
}
} else { } else {
// If the channel is closed before connecting, its remoteAddress will be null. // If the channel is closed before connecting, its remoteAddress will be null.
// See java.net.Socket.getRemoteSocketAddress // See java.net.Socket.getRemoteSocketAddress
...@@ -606,6 +621,12 @@ private[netty] class NettyRpcHandler( ...@@ -606,6 +621,12 @@ private[netty] class NettyRpcHandler(
val clientAddr = RpcAddress(addr.getHostName, addr.getPort) val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
nettyEnv.removeOutbox(clientAddr) nettyEnv.removeOutbox(clientAddr)
dispatcher.postToAll(RemoteProcessDisconnected(clientAddr)) dispatcher.postToAll(RemoteProcessDisconnected(clientAddr))
val remoteEnvAddress = remoteAddresses.remove(clientAddr)
// If the remove RpcEnv listens to some address, we should also fire a
// RemoteProcessDisconnected for the remote RpcEnv listening address
if (remoteEnvAddress != null) {
dispatcher.postToAll(RemoteProcessDisconnected(remoteEnvAddress))
}
} else { } else {
// If the channel is closed before connecting, its remoteAddress will be null. In this case, // If the channel is closed before connecting, its remoteAddress will be null. In this case,
// we can ignore it since we don't fire "Associated". // we can ignore it since we don't fire "Associated".
......
...@@ -545,6 +545,48 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { ...@@ -545,6 +545,48 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
} }
} }
test("network events between non-client-mode RpcEnvs") {
val events = new mutable.ArrayBuffer[(Any, Any)] with mutable.SynchronizedBuffer[(Any, Any)]
env.setupEndpoint("network-events-non-client", new ThreadSafeRpcEndpoint {
override val rpcEnv = env
override def receive: PartialFunction[Any, Unit] = {
case "hello" =>
case m => events += "receive" -> m
}
override def onConnected(remoteAddress: RpcAddress): Unit = {
events += "onConnected" -> remoteAddress
}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
events += "onDisconnected" -> remoteAddress
}
override def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
events += "onNetworkError" -> remoteAddress
}
})
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = false)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef(
"local", env.address, "network-events-non-client")
val remoteAddress = anotherEnv.address
rpcEndpointRef.send("hello")
eventually(timeout(5 seconds), interval(5 millis)) {
assert(events.contains(("onConnected", remoteAddress)))
}
anotherEnv.shutdown()
anotherEnv.awaitTermination()
eventually(timeout(5 seconds), interval(5 millis)) {
assert(events.contains(("onConnected", remoteAddress)))
assert(events.contains(("onDisconnected", remoteAddress)))
}
}
test("sendWithReply: unserializable error") { test("sendWithReply: unserializable error") {
env.setupEndpoint("sendWithReply-unserializable-error", new RpcEndpoint { env.setupEndpoint("sendWithReply-unserializable-error", new RpcEndpoint {
override val rpcEnv = env override val rpcEnv = env
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment