diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 2a2ce0504dbbf529eac99b2bb4022b02ec925d58..956724b14bba3520e61dfe652e693920d837fa4d 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -579,7 +579,9 @@ private[spark] object SparkConf extends Logging { "are no longer accepted. To specify the equivalent now, one may use '64k'."), DeprecatedConfig("spark.rpc", "2.0", "Not used any more."), DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0", - "Please use the new blacklisting options, spark.blacklist.*") + "Please use the new blacklisting options, spark.blacklist.*"), + DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"), + DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index f4a59f069a5f99e07d219cbf1d02a6c9c971fdb9..3196c1ece15eba7ca6c1cb271095f98371515bd8 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -177,7 +177,7 @@ object SparkEnv extends Logging { SparkContext.DRIVER_IDENTIFIER, bindAddress, advertiseAddress, - port, + Option(port), isLocal, numCores, ioEncryptionKey, @@ -194,7 +194,6 @@ object SparkEnv extends Logging { conf: SparkConf, executorId: String, hostname: String, - port: Int, numCores: Int, ioEncryptionKey: Option[Array[Byte]], isLocal: Boolean): SparkEnv = { @@ -203,7 +202,7 @@ object SparkEnv extends Logging { executorId, hostname, hostname, - port, + None, isLocal, numCores, ioEncryptionKey @@ -220,7 +219,7 @@ object SparkEnv extends Logging { executorId: String, bindAddress: String, advertiseAddress: String, - port: Int, + port: Option[Int], isLocal: Boolean, numUsableCores: Int, ioEncryptionKey: Option[Array[Byte]], @@ -243,17 +242,12 @@ object SparkEnv extends Logging { } val systemName = if (isDriver) driverSystemName else executorSystemName - val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf, + val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf, securityManager, clientMode = !isDriver) // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied. - // In the non-driver case, the RPC env's address may be null since it may not be listening - // for incoming connections. if (isDriver) { conf.set("spark.driver.port", rpcEnv.address.port.toString) - } else if (rpcEnv.address != null) { - conf.set("spark.executor.port", rpcEnv.address.port.toString) - logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}") } // Create an instance of the class with the given name, possibly initializing it with our conf diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b2b26ee107c0033e47f8482598b34d8258799385..a2f1aa22b006353941844a7da0b9b5c2eafcf179 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -191,11 +191,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf - val port = executorConf.getInt("spark.executor.port", 0) val fetcher = RpcEnv.create( "driverPropsFetcher", hostname, - port, + -1, executorConf, new SecurityManager(executorConf), clientMode = true) @@ -221,7 +220,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } val env = SparkEnv.createExecutorEnv( - driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false) + driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 314a806edf39e2e67aa4325979379ed83ba4e680..c1344ad99a7d29605d75f7399efe4bb4ebe4e8b2 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -209,7 +209,7 @@ provide such guarantees on the offer stream. In this mode spark executors will honor port allocation if such is provided from the user. Specifically if the user defines -`spark.executor.port` or `spark.blockManager.port` in Spark configuration, +`spark.blockManager.port` in Spark configuration, the mesos scheduler will check the available offers for a valid port range containing the port numbers. If no such range is available it will not launch any task. If no restriction is imposed on port numbers by the diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index e9ddaa76a797f0aa003fc7dd03faa45b2a41168d..2d56123028f2bce77d7cea78d87c3e0fb926dda5 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -239,13 +239,6 @@ To use a custom metrics.properties for the application master and executors, upd Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the YARN Application Master in client mode. </td> </tr> -<tr> - <td><code>spark.yarn.am.port</code></td> - <td>(random)</td> - <td> - Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend. - </td> -</tr> <tr> <td><code>spark.yarn.queue</code></td> <td><code>default</code></td> diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index a086ec7ea2da66999925b0dc16c75a3a4e848840..61bfa27a84fd8adabc263596e6128a202f0e1138 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -74,9 +74,8 @@ private[spark] class MesosExecutorBackend val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) val conf = new SparkConf(loadDefaults = true).setAll(properties) - val port = conf.getInt("spark.executor.port", 0) val env = SparkEnv.createExecutorEnv( - conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false) + conf, executorId, slaveInfo.getHostname, cpusPerTask, None, isLocal = false) executor = new Executor( executorId, diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 9d81025a3016be3a5557679ecf1639330375c360..062ed1f93fa52aacea305393389faf95779a8d2e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -438,7 +438,7 @@ trait MesosSchedulerUtils extends Logging { } } - val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key) + val managedPortNames = List(BLOCK_MANAGER_PORT.key) /** * The values of the non-zero ports to be used by the executor process. diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index ec47ab153177e6d174f280b5693227a12540f1a9..5d4bf6d082c4c72bcb38a42af8d8e4e0446a086c 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -179,40 +179,25 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Port reservation is done correctly with user specified ports only") { val conf = new SparkConf() - conf.set("spark.executor.port", "3000" ) conf.set(BLOCK_MANAGER_PORT, 4000) val portResource = createTestPortResource((3000, 5000), Some("my_role")) val (resourcesLeft, resourcesToBeUsed) = utils - .partitionPortResources(List(3000, 4000), List(portResource)) - resourcesToBeUsed.length shouldBe 2 + .partitionPortResources(List(4000), List(portResource)) + resourcesToBeUsed.length shouldBe 1 val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray - portsToUse.length shouldBe 2 - arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true + portsToUse.length shouldBe 1 + arePortsEqual(portsToUse, Array(4000L)) shouldBe true val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) - val expectedUSed = Array((3000L, 3000L), (4000L, 4000L)) + val expectedUSed = Array((4000L, 4000L)) arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true } - test("Port reservation is done correctly with some user specified ports (spark.executor.port)") { - val conf = new SparkConf() - conf.set("spark.executor.port", "3100" ) - val portResource = createTestPortResource((3000, 5000), Some("my_role")) - - val (resourcesLeft, resourcesToBeUsed) = utils - .partitionPortResources(List(3100), List(portResource)) - - val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} - - portsToUse.length shouldBe 1 - portsToUse.contains(3100) shouldBe true - } - test("Port reservation is done correctly with all random ports") { val conf = new SparkConf() val portResource = createTestPortResource((3000L, 5000L), Some("my_role")) @@ -226,21 +211,20 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Port reservation is done correctly with user specified ports only - multiple ranges") { val conf = new SparkConf() - conf.set("spark.executor.port", "2100" ) conf.set("spark.blockManager.port", "4000") val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), createTestPortResource((2000, 2500), Some("other_role"))) val (resourcesLeft, resourcesToBeUsed) = utils - .partitionPortResources(List(2100, 4000), portResourceList) + .partitionPortResources(List(4000), portResourceList) val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} - portsToUse.length shouldBe 2 + portsToUse.length shouldBe 1 val portsRangesLeft = rangesResourcesToTuple(resourcesLeft) val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) - val expectedUsed = Array((2100L, 2100L), (4000L, 4000L)) + val expectedUsed = Array((4000L, 4000L)) - arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true + arePortsEqual(portsToUse.toArray, Array(4000L)) shouldBe true arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 864c834d110fdf3c3fcdbb62dacf968e9b3f5600..6da2c0b5f330aaa71d4cfb74e9442b46f8de1cd7 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -429,8 +429,7 @@ private[spark] class ApplicationMaster( } private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { - val port = sparkConf.get(AM_PORT) - rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr, + rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, -1, sparkConf, securityMgr, clientMode = true) val driverRef = waitForSparkDriver() addAmIpFilter() diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index d8c96c35ca71c00415456a328bf23b3d29d84db3..d4108caab28c18976330afe7a9694211899ed53b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -40,11 +40,6 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createOptional - private[spark] val AM_PORT = - ConfigBuilder("spark.yarn.am.port") - .intConf - .createWithDefault(0) - private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = ConfigBuilder("spark.yarn.executor.failuresValidityInterval") .doc("Interval after which Executor failures will be considered independent and not " +