diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 55602a90829d519159db1298f65fffe9396532c1..45658323344206e5f3fec8f722a27f8286364210 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -209,16 +209,10 @@ class SparkContext(config: SparkConf) extends Logging { // An asynchronous listener bus for Spark events private[spark] val listenerBus = new LiveListenerBus - // Create the Spark execution environment (cache, map output tracker, etc) conf.set("spark.executor.id", "driver") - private[spark] val env = SparkEnv.create( - conf, - "<driver>", - conf.get("spark.driver.host"), - conf.get("spark.driver.port").toInt, - isDriver = true, - isLocal = isLocal, - listenerBus = listenerBus) + + // Create the Spark execution environment (cache, map output tracker, etc) + private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus) SparkEnv.set(env) // Used to store a URL for each static file/jar together with the file's local timestamp diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 906a00b0bd17c7d8ebc1f0213b3823f78e02ba97..5c076e5f1c11dce3b549a3eb72be779a61808d91 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -144,14 +144,46 @@ object SparkEnv extends Logging { env } - private[spark] def create( + /** + * Create a SparkEnv for the driver. + */ + private[spark] def createDriverEnv( + conf: SparkConf, + isLocal: Boolean, + listenerBus: LiveListenerBus): SparkEnv = { + assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!") + assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!") + val hostname = conf.get("spark.driver.host") + val port = conf.get("spark.driver.port").toInt + create(conf, "<driver>", hostname, port, true, isLocal, listenerBus) + } + + /** + * Create a SparkEnv for an executor. + * In coarse-grained mode, the executor provides an actor system that is already instantiated. + */ + private[spark] def createExecutorEnv( + conf: SparkConf, + executorId: String, + hostname: String, + port: Int, + isLocal: Boolean, + actorSystem: ActorSystem = null): SparkEnv = { + create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem) + } + + /** + * Helper method to create a SparkEnv for a driver or an executor. + */ + private def create( conf: SparkConf, executorId: String, hostname: String, port: Int, isDriver: Boolean, isLocal: Boolean, - listenerBus: LiveListenerBus = null): SparkEnv = { + listenerBus: LiveListenerBus = null, + defaultActorSystem: ActorSystem = null): SparkEnv = { // Listener bus is only used on the driver if (isDriver) { @@ -159,9 +191,16 @@ object SparkEnv extends Logging { } val securityManager = new SecurityManager(conf) - val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName - val (actorSystem, boundPort) = AkkaUtils.createActorSystem( - actorSystemName, hostname, port, conf, securityManager) + + // If an existing actor system is already provided, use it. + // This is the case when an executor is launched in coarse-grained mode. + val (actorSystem, boundPort) = + Option(defaultActorSystem) match { + case Some(as) => (as, port) + case None => + val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName + AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager) + } // Figure out which port Akka actually bound to in case the original port is 0 or occupied. // This is so that we tell the executors the correct port to connect to. diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index c40a3e16675ad5cfe81450898c8c660eb326a8c2..697154d762d41a130e94055145a4f3e16bef7bc8 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import scala.concurrent.Await -import akka.actor.{Actor, ActorSelection, Props} +import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import akka.pattern.Patterns import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent} @@ -38,7 +38,8 @@ private[spark] class CoarseGrainedExecutorBackend( executorId: String, hostPort: String, cores: Int, - sparkProperties: Seq[(String, String)]) + sparkProperties: Seq[(String, String)], + actorSystem: ActorSystem) extends Actor with ActorLogReceive with ExecutorBackend with Logging { Utils.checkHostPort(hostPort, "Expected hostport") @@ -57,8 +58,8 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor => logInfo("Successfully registered with driver") // Make this host instead of hostPort ? - executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties, - false) + val (hostname, _) = Utils.parseHostPort(hostPort) + executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) @@ -135,7 +136,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], - driverUrl, executorId, sparkHostPort, cores, props), + driverUrl, executorId, sparkHostPort, cores, props, actorSystem), name = "Executor") workerUrl.foreach { url => actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0b75b9b21fb821c50e7e8d2da9af29f473df3cf4..70a46c75f42c4035de133954a6f1a91328e843cf 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -26,6 +26,8 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal +import akka.actor.ActorSystem + import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ @@ -35,12 +37,14 @@ import org.apache.spark.util.{AkkaUtils, Utils} /** * Spark executor used with Mesos, YARN, and the standalone scheduler. + * In coarse-grained mode, an existing actor system is provided. */ private[spark] class Executor( executorId: String, slaveHostname: String, properties: Seq[(String, String)], - isLocal: Boolean = false) + isLocal: Boolean = false, + actorSystem: ActorSystem = null) extends Logging { // Application dependencies (added through SparkContext) that we've fetched so far on this node. @@ -77,8 +81,9 @@ private[spark] class Executor( conf.set("spark.executor.id", "executor." + executorId) private val env = { if (!isLocal) { - val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, - isDriver = false, isLocal = false) + val port = conf.getInt("spark.executor.port", 0) + val _env = SparkEnv.createExecutorEnv( + conf, executorId, slaveHostname, port, isLocal, actorSystem) SparkEnv.set(_env) _env.metricsSystem.registerSource(executorSource) _env