From 829cd7b8b70e65a91aa66e6d626bd45f18e0ad97 Mon Sep 17 00:00:00 2001
From: jerryshao <sshao@hortonworks.com>
Date: Mon, 8 May 2017 14:27:56 -0700
Subject: [PATCH] [SPARK-20605][CORE][YARN][MESOS] Deprecate not used AM and
 executor port configuration

## What changes were proposed in this pull request?

After SPARK-10997, client mode Netty RpcEnv doesn't require to start server, so port configurations are not used any more, here propose to remove these two configurations: "spark.executor.port" and "spark.am.port".

## How was this patch tested?

Existing UTs.

Author: jerryshao <sshao@hortonworks.com>

Closes #17866 from jerryshao/SPARK-20605.
---
 .../scala/org/apache/spark/SparkConf.scala    |  4 ++-
 .../scala/org/apache/spark/SparkEnv.scala     | 14 +++-----
 .../CoarseGrainedExecutorBackend.scala        |  5 ++-
 docs/running-on-mesos.md                      |  2 +-
 docs/running-on-yarn.md                       |  7 ----
 .../spark/executor/MesosExecutorBackend.scala |  3 +-
 .../cluster/mesos/MesosSchedulerUtils.scala   |  2 +-
 .../mesos/MesosSchedulerUtilsSuite.scala      | 34 +++++--------------
 .../spark/deploy/yarn/ApplicationMaster.scala |  3 +-
 .../org/apache/spark/deploy/yarn/config.scala |  5 ---
 10 files changed, 22 insertions(+), 57 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 2a2ce0504d..956724b14b 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -579,7 +579,9 @@ private[spark] object SparkConf extends Logging {
           "are no longer accepted. To specify the equivalent now, one may use '64k'."),
       DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
       DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
-        "Please use the new blacklisting options, spark.blacklist.*")
+        "Please use the new blacklisting options, spark.blacklist.*"),
+      DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"),
+      DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more")
     )
 
     Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index f4a59f069a..3196c1ece1 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -177,7 +177,7 @@ object SparkEnv extends Logging {
       SparkContext.DRIVER_IDENTIFIER,
       bindAddress,
       advertiseAddress,
-      port,
+      Option(port),
       isLocal,
       numCores,
       ioEncryptionKey,
@@ -194,7 +194,6 @@ object SparkEnv extends Logging {
       conf: SparkConf,
       executorId: String,
       hostname: String,
-      port: Int,
       numCores: Int,
       ioEncryptionKey: Option[Array[Byte]],
       isLocal: Boolean): SparkEnv = {
@@ -203,7 +202,7 @@ object SparkEnv extends Logging {
       executorId,
       hostname,
       hostname,
-      port,
+      None,
       isLocal,
       numCores,
       ioEncryptionKey
@@ -220,7 +219,7 @@ object SparkEnv extends Logging {
       executorId: String,
       bindAddress: String,
       advertiseAddress: String,
-      port: Int,
+      port: Option[Int],
       isLocal: Boolean,
       numUsableCores: Int,
       ioEncryptionKey: Option[Array[Byte]],
@@ -243,17 +242,12 @@ object SparkEnv extends Logging {
     }
 
     val systemName = if (isDriver) driverSystemName else executorSystemName
-    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
+    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
       securityManager, clientMode = !isDriver)
 
     // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
-    // In the non-driver case, the RPC env's address may be null since it may not be listening
-    // for incoming connections.
     if (isDriver) {
       conf.set("spark.driver.port", rpcEnv.address.port.toString)
-    } else if (rpcEnv.address != null) {
-      conf.set("spark.executor.port", rpcEnv.address.port.toString)
-      logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
     }
 
     // Create an instance of the class with the given name, possibly initializing it with our conf
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index b2b26ee107..a2f1aa22b0 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -191,11 +191,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
 
       // Bootstrap to fetch the driver's Spark properties.
       val executorConf = new SparkConf
-      val port = executorConf.getInt("spark.executor.port", 0)
       val fetcher = RpcEnv.create(
         "driverPropsFetcher",
         hostname,
-        port,
+        -1,
         executorConf,
         new SecurityManager(executorConf),
         clientMode = true)
@@ -221,7 +220,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       }
 
       val env = SparkEnv.createExecutorEnv(
-        driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
+        driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
 
       env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
         env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 314a806edf..c1344ad99a 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -209,7 +209,7 @@ provide such guarantees on the offer stream.
 
 In this mode spark executors will honor port allocation if such is
 provided from the user. Specifically if the user defines
-`spark.executor.port` or `spark.blockManager.port` in Spark configuration,
+`spark.blockManager.port` in Spark configuration,
 the mesos scheduler will check the available offers for a valid port
 range containing the port numbers. If no such range is available it will
 not launch any task. If no restriction is imposed on port numbers by the
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index e9ddaa76a7..2d56123028 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -239,13 +239,6 @@ To use a custom metrics.properties for the application master and executors, upd
     Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
   </td>
 </tr>
-<tr>
-  <td><code>spark.yarn.am.port</code></td>
-  <td>(random)</td>
-  <td>
-    Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend.
-  </td>
-</tr>
 <tr>
   <td><code>spark.yarn.queue</code></td>
   <td><code>default</code></td>
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index a086ec7ea2..61bfa27a84 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -74,9 +74,8 @@ private[spark] class MesosExecutorBackend
     val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
       Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
     val conf = new SparkConf(loadDefaults = true).setAll(properties)
-    val port = conf.getInt("spark.executor.port", 0)
     val env = SparkEnv.createExecutorEnv(
-      conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false)
+      conf, executorId, slaveInfo.getHostname, cpusPerTask, None, isLocal = false)
 
     executor = new Executor(
       executorId,
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 9d81025a30..062ed1f93f 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -438,7 +438,7 @@ trait MesosSchedulerUtils extends Logging {
     }
   }
 
-  val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key)
+  val managedPortNames = List(BLOCK_MANAGER_PORT.key)
 
   /**
    * The values of the non-zero ports to be used by the executor process.
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
index ec47ab1531..5d4bf6d082 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -179,40 +179,25 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
 
   test("Port reservation is done correctly with user specified ports only") {
     val conf = new SparkConf()
-    conf.set("spark.executor.port", "3000" )
     conf.set(BLOCK_MANAGER_PORT, 4000)
     val portResource = createTestPortResource((3000, 5000), Some("my_role"))
 
     val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(3000, 4000), List(portResource))
-    resourcesToBeUsed.length shouldBe 2
+      .partitionPortResources(List(4000), List(portResource))
+    resourcesToBeUsed.length shouldBe 1
 
     val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray
 
-    portsToUse.length shouldBe 2
-    arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
+    portsToUse.length shouldBe 1
+    arePortsEqual(portsToUse, Array(4000L)) shouldBe true
 
     val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
 
-    val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
+    val expectedUSed = Array((4000L, 4000L))
 
     arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
   }
 
-  test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
-    val conf = new SparkConf()
-    conf.set("spark.executor.port", "3100" )
-    val portResource = createTestPortResource((3000, 5000), Some("my_role"))
-
-    val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(3100), List(portResource))
-
-    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
-    portsToUse.length shouldBe 1
-    portsToUse.contains(3100) shouldBe true
-  }
-
   test("Port reservation is done correctly with all random ports") {
     val conf = new SparkConf()
     val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
@@ -226,21 +211,20 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
 
   test("Port reservation is done correctly with user specified ports only - multiple ranges") {
     val conf = new SparkConf()
-    conf.set("spark.executor.port", "2100" )
     conf.set("spark.blockManager.port", "4000")
     val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
       createTestPortResource((2000, 2500), Some("other_role")))
     val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(2100, 4000), portResourceList)
+      .partitionPortResources(List(4000), portResourceList)
     val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
 
-    portsToUse.length shouldBe 2
+    portsToUse.length shouldBe 1
     val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
     val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
 
-    val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
+    val expectedUsed = Array((4000L, 4000L))
 
-    arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
+    arePortsEqual(portsToUse.toArray, Array(4000L)) shouldBe true
     arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
   }
 
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 864c834d11..6da2c0b5f3 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -429,8 +429,7 @@ private[spark] class ApplicationMaster(
   }
 
   private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
-    val port = sparkConf.get(AM_PORT)
-    rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr,
+    rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, -1, sparkConf, securityMgr,
       clientMode = true)
     val driverRef = waitForSparkDriver()
     addAmIpFilter()
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index d8c96c35ca..d4108caab2 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -40,11 +40,6 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
-  private[spark] val AM_PORT =
-    ConfigBuilder("spark.yarn.am.port")
-      .intConf
-      .createWithDefault(0)
-
   private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
     ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
       .doc("Interval after which Executor failures will be considered independent and not " +
-- 
GitLab