Skip to content
Snippets Groups Projects
Commit c439d88e authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-14547] Avoid DNS resolution for reusing connections

## What changes were proposed in this pull request?
This patch changes the connection creation logic in the network client module to avoid DNS resolution when reusing connections.

## How was this patch tested?
Testing in production. This is too difficult to test in isolation (for high fidelity unit tests, we'd need to change the DNS resolution behavior in the JVM).

Author: Reynold Xin <rxin@databricks.com>

Closes #12315 from rxin/SPARK-14547.
parent 1ef5f8cf
No related branches found
No related tags found
No related merge requests found
...@@ -123,16 +123,15 @@ public class TransportClientFactory implements Closeable { ...@@ -123,16 +123,15 @@ public class TransportClientFactory implements Closeable {
public TransportClient createClient(String remoteHost, int remotePort) throws IOException { public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
// Get connection from the connection pool first. // Get connection from the connection pool first.
// If it is not found or not active, create a new one. // If it is not found or not active, create a new one.
long preResolveHost = System.nanoTime(); // Use unresolved address here to avoid DNS resolution each time we creates a client.
final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); final InetSocketAddress unresolvedAddress =
long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000; InetSocketAddress.createUnresolved(remoteHost, remotePort);
logger.info("Spent {} ms to resolve {}", hostResolveTimeMs, address);
// Create the ClientPool if we don't have it yet. // Create the ClientPool if we don't have it yet.
ClientPool clientPool = connectionPool.get(address); ClientPool clientPool = connectionPool.get(unresolvedAddress);
if (clientPool == null) { if (clientPool == null) {
connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer)); connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
clientPool = connectionPool.get(address); clientPool = connectionPool.get(unresolvedAddress);
} }
int clientIndex = rand.nextInt(numConnectionsPerPeer); int clientIndex = rand.nextInt(numConnectionsPerPeer);
...@@ -149,25 +148,35 @@ public class TransportClientFactory implements Closeable { ...@@ -149,25 +148,35 @@ public class TransportClientFactory implements Closeable {
} }
if (cachedClient.isActive()) { if (cachedClient.isActive()) {
logger.trace("Returning cached connection to {}: {}", address, cachedClient); logger.trace("Returning cached connection to {}: {}",
cachedClient.getSocketAddress(), cachedClient);
return cachedClient; return cachedClient;
} }
} }
// If we reach here, we don't have an existing connection open. Let's create a new one. // If we reach here, we don't have an existing connection open. Let's create a new one.
// Multiple threads might race here to create new connections. Keep only one of them active. // Multiple threads might race here to create new connections. Keep only one of them active.
final long preResolveHost = System.nanoTime();
final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
if (hostResolveTimeMs > 2000) {
logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
} else {
logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
}
synchronized (clientPool.locks[clientIndex]) { synchronized (clientPool.locks[clientIndex]) {
cachedClient = clientPool.clients[clientIndex]; cachedClient = clientPool.clients[clientIndex];
if (cachedClient != null) { if (cachedClient != null) {
if (cachedClient.isActive()) { if (cachedClient.isActive()) {
logger.trace("Returning cached connection to {}: {}", address, cachedClient); logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);
return cachedClient; return cachedClient;
} else { } else {
logger.info("Found inactive connection to {}, creating a new one.", address); logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
} }
} }
clientPool.clients[clientIndex] = createClient(address); clientPool.clients[clientIndex] = createClient(resolvedAddress);
return clientPool.clients[clientIndex]; return clientPool.clients[clientIndex];
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment