From 2bda1c1d376afd8abe6a04be345461752f3fb1b6 Mon Sep 17 00:00:00 2001
From: huangzhaowei <carlmartinmax@gmail.com>
Date: Fri, 6 Feb 2015 14:35:29 -0800
Subject: [PATCH] [SPARK-5444][Network]Add a retry to deal with the conflict
 port in netty server.

If the `spark.blockMnager.port` had conflicted with a specific port, Spark will throw an exception and exit.
So add a retry to avoid this situation.

Author: huangzhaowei <carlmartinmax@gmail.com>

Closes #4240 from SaintBacchus/NettyPortConflict and squashes the following commits:

cc926d2 [huangzhaowei] Add a retry to deal with the conflict port in netty server.
---
 .../spark/network/server/TransportServer.java | 36 +++++++++++++++++--
 .../spark/network/util/TransportConf.java     |  7 ++++
 2 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index 625c3257d7..ef20999180 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -100,8 +100,7 @@ public class TransportServer implements Closeable {
       }
     });
 
-    channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
-    channelFuture.syncUninterruptibly();
+    bindRightPort(portToBind);
 
     port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
     logger.debug("Shuffle server started on port :" + port);
@@ -123,4 +122,37 @@ public class TransportServer implements Closeable {
     bootstrap = null;
   }
 
+  /**
+   * Attempt to bind to the specified port up to a fixed number of retries.
+   * If all attempts fail after the max number of retries, exit.
+   */
+  private void bindRightPort(int portToBind) {
+    int maxPortRetries = conf.portMaxRetries();
+
+    for (int i = 0; i <= maxPortRetries; i++) {
+      int tryPort = -1;
+      if (0 == portToBind) {
+        // Do not increment port if tryPort is 0, which is treated as a special port
+        tryPort = 0;
+      } else {
+        // If the new port wraps around, do not try a privilege port
+        tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024;
+      }
+      try {
+        channelFuture = bootstrap.bind(new InetSocketAddress(tryPort));
+        channelFuture.syncUninterruptibly();
+        return;
+      } catch (Exception e) {
+        logger.warn("Netty service could not bind on port " + tryPort +
+          ". Attempting the next port.");
+        if (i >= maxPortRetries) {
+          logger.error(e.getMessage() + ": Netty server failed after "
+            + maxPortRetries + " retries.");
+
+          // If it can't find a right port, it should exit directly.
+          System.exit(-1);
+        }
+      }
+    }
+  }
 }
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 6c91786886..2eaf3b71d9 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -98,4 +98,11 @@ public class TransportConf {
   public boolean lazyFileDescriptor() {
     return conf.getBoolean("spark.shuffle.io.lazyFD", true);
   }
+
+  /**
+   * Maximum number of retries when binding to a port before giving up.
+   */
+  public int portMaxRetries() {
+    return conf.getInt("spark.port.maxRetries", 16);
+  }
 }
-- 
GitLab