Skip to content
Snippets Groups Projects
Commit 09e8be9a authored by Prashant Sharma's avatar Prashant Sharma
Browse files

Made running SparkActorSystem specific to executors only.

parent 0f24576c
No related branches found
No related tags found
No related merge requests found
......@@ -97,7 +97,8 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
useSparkAS = true)
// set it
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
......
......@@ -35,7 +35,9 @@ private[spark] object AkkaUtils {
* Note: the `name` parameter is important, as even if a client sends a message to right
* host + port, if the system name is incorrect, Akka will drop the message.
*/
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
def createActorSystem(name: String, host: String, port: Int,
useSparkAS: Boolean = false): (ActorSystem, Int) = {
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
......@@ -70,7 +72,12 @@ private[spark] object AkkaUtils {
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
""".stripMargin)
val actorSystem = SparkActorSystem(name, akkaConf)
val actorSystem = if (useSparkAS) {
SparkActorSystem(name, akkaConf)
}
else {
ActorSystem(name, akkaConf)
}
val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
val boundPort = provider.getDefaultAddress.port.get
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment