Skip to content
Snippets Groups Projects
Commit 5c9117a3 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-15395][CORE] Use getHostString to create RpcAddress

## What changes were proposed in this pull request?

Right now the netty RPC uses `InetSocketAddress.getHostName` to create `RpcAddress` for network events. If we use an IP address to connect, then the RpcAddress's host will be a host name (if the reverse lookup successes) instead of the IP address. However, some places need to compare the original IP address and the RpcAddress in `onDisconnect` (e.g., CoarseGrainedExecutorBackend), and this behavior will make the check incorrect.

This PR uses `getHostString` to resolve the issue.

## How was this patch tested?

Jenkins unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13185 from zsxwing/host-string.
parent b1bc5ebd
No related branches found
No related tags found
No related merge requests found
......@@ -574,7 +574,7 @@ private[netty] class NettyRpcHandler(
private def internalReceive(client: TransportClient, message: ByteBuffer): RequestMessage = {
val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
assert(addr != null)
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
val requestMessage = nettyEnv.deserialize[RequestMessage](client, message)
if (requestMessage.senderAddress == null) {
// Create a new message with the socket address of the client as the sender.
......@@ -595,7 +595,7 @@ private[netty] class NettyRpcHandler(
override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = {
val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
if (addr != null) {
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
// If the remove RpcEnv listens to some address, we should also fire a
// RemoteProcessConnectionError for the remote RpcEnv listening address
......@@ -614,14 +614,14 @@ private[netty] class NettyRpcHandler(
override def channelActive(client: TransportClient): Unit = {
val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
assert(addr != null)
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
dispatcher.postToAll(RemoteProcessConnected(clientAddr))
}
override def channelInactive(client: TransportClient): Unit = {
val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
if (addr != null) {
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
nettyEnv.removeOutbox(clientAddr)
dispatcher.postToAll(RemoteProcessDisconnected(clientAddr))
val remoteEnvAddress = remoteAddresses.remove(clientAddr)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment