Skip to content
Snippets Groups Projects
Commit b563987e authored by Andrew Or's avatar Andrew Or Committed by Andrew Or
Browse files

[SPARK-4013] Do not create multiple actor systems on each executor

In the existing code, each coarse-grained executor has two concurrently running actor systems. This causes many more error messages to be logged than necessary when the executor is lost or killed because we receive a disassociation event for each of these actor systems.

This is blocking #2840.

Author: Andrew Or <andrewor14@gmail.com>

Closes #2863 from andrewor14/executor-actor-system and squashes the following commits:

44ce2e0 [Andrew Or] Avoid starting two actor systems on each executor
parent 098f83c7
No related branches found
No related tags found
No related merge requests found
...@@ -209,16 +209,10 @@ class SparkContext(config: SparkConf) extends Logging { ...@@ -209,16 +209,10 @@ class SparkContext(config: SparkConf) extends Logging {
// An asynchronous listener bus for Spark events // An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus private[spark] val listenerBus = new LiveListenerBus
// Create the Spark execution environment (cache, map output tracker, etc)
conf.set("spark.executor.id", "driver") conf.set("spark.executor.id", "driver")
private[spark] val env = SparkEnv.create(
conf, // Create the Spark execution environment (cache, map output tracker, etc)
"<driver>", private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
isDriver = true,
isLocal = isLocal,
listenerBus = listenerBus)
SparkEnv.set(env) SparkEnv.set(env)
// Used to store a URL for each static file/jar together with the file's local timestamp // Used to store a URL for each static file/jar together with the file's local timestamp
......
...@@ -144,14 +144,46 @@ object SparkEnv extends Logging { ...@@ -144,14 +144,46 @@ object SparkEnv extends Logging {
env env
} }
private[spark] def create( /**
* Create a SparkEnv for the driver.
*/
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("spark.driver.host")
val port = conf.get("spark.driver.port").toInt
create(conf, "<driver>", hostname, port, true, isLocal, listenerBus)
}
/**
* Create a SparkEnv for an executor.
* In coarse-grained mode, the executor provides an actor system that is already instantiated.
*/
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
isLocal: Boolean,
actorSystem: ActorSystem = null): SparkEnv = {
create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem)
}
/**
* Helper method to create a SparkEnv for a driver or an executor.
*/
private def create(
conf: SparkConf, conf: SparkConf,
executorId: String, executorId: String,
hostname: String, hostname: String,
port: Int, port: Int,
isDriver: Boolean, isDriver: Boolean,
isLocal: Boolean, isLocal: Boolean,
listenerBus: LiveListenerBus = null): SparkEnv = { listenerBus: LiveListenerBus = null,
defaultActorSystem: ActorSystem = null): SparkEnv = {
// Listener bus is only used on the driver // Listener bus is only used on the driver
if (isDriver) { if (isDriver) {
...@@ -159,9 +191,16 @@ object SparkEnv extends Logging { ...@@ -159,9 +191,16 @@ object SparkEnv extends Logging {
} }
val securityManager = new SecurityManager(conf) val securityManager = new SecurityManager(conf)
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
val (actorSystem, boundPort) = AkkaUtils.createActorSystem( // If an existing actor system is already provided, use it.
actorSystemName, hostname, port, conf, securityManager) // This is the case when an executor is launched in coarse-grained mode.
val (actorSystem, boundPort) =
Option(defaultActorSystem) match {
case Some(as) => (as, port)
case None =>
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
}
// Figure out which port Akka actually bound to in case the original port is 0 or occupied. // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
// This is so that we tell the executors the correct port to connect to. // This is so that we tell the executors the correct port to connect to.
......
...@@ -21,7 +21,7 @@ import java.nio.ByteBuffer ...@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import scala.concurrent.Await import scala.concurrent.Await
import akka.actor.{Actor, ActorSelection, Props} import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import akka.pattern.Patterns import akka.pattern.Patterns
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent} import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
...@@ -38,7 +38,8 @@ private[spark] class CoarseGrainedExecutorBackend( ...@@ -38,7 +38,8 @@ private[spark] class CoarseGrainedExecutorBackend(
executorId: String, executorId: String,
hostPort: String, hostPort: String,
cores: Int, cores: Int,
sparkProperties: Seq[(String, String)]) sparkProperties: Seq[(String, String)],
actorSystem: ActorSystem)
extends Actor with ActorLogReceive with ExecutorBackend with Logging { extends Actor with ActorLogReceive with ExecutorBackend with Logging {
Utils.checkHostPort(hostPort, "Expected hostport") Utils.checkHostPort(hostPort, "Expected hostport")
...@@ -57,8 +58,8 @@ private[spark] class CoarseGrainedExecutorBackend( ...@@ -57,8 +58,8 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor => case RegisteredExecutor =>
logInfo("Successfully registered with driver") logInfo("Successfully registered with driver")
// Make this host instead of hostPort ? // Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties, val (hostname, _) = Utils.parseHostPort(hostPort)
false) executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem)
case RegisterExecutorFailed(message) => case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message) logError("Slave registration failed: " + message)
...@@ -135,7 +136,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { ...@@ -135,7 +136,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val sparkHostPort = hostname + ":" + boundPort val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf( actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], Props(classOf[CoarseGrainedExecutorBackend],
driverUrl, executorId, sparkHostPort, cores, props), driverUrl, executorId, sparkHostPort, cores, props, actorSystem),
name = "Executor") name = "Executor")
workerUrl.foreach { url => workerUrl.foreach { url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
......
...@@ -26,6 +26,8 @@ import scala.collection.JavaConversions._ ...@@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.ActorSystem
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._ import org.apache.spark.scheduler._
...@@ -35,12 +37,14 @@ import org.apache.spark.util.{AkkaUtils, Utils} ...@@ -35,12 +37,14 @@ import org.apache.spark.util.{AkkaUtils, Utils}
/** /**
* Spark executor used with Mesos, YARN, and the standalone scheduler. * Spark executor used with Mesos, YARN, and the standalone scheduler.
* In coarse-grained mode, an existing actor system is provided.
*/ */
private[spark] class Executor( private[spark] class Executor(
executorId: String, executorId: String,
slaveHostname: String, slaveHostname: String,
properties: Seq[(String, String)], properties: Seq[(String, String)],
isLocal: Boolean = false) isLocal: Boolean = false,
actorSystem: ActorSystem = null)
extends Logging extends Logging
{ {
// Application dependencies (added through SparkContext) that we've fetched so far on this node. // Application dependencies (added through SparkContext) that we've fetched so far on this node.
...@@ -77,8 +81,9 @@ private[spark] class Executor( ...@@ -77,8 +81,9 @@ private[spark] class Executor(
conf.set("spark.executor.id", "executor." + executorId) conf.set("spark.executor.id", "executor." + executorId)
private val env = { private val env = {
if (!isLocal) { if (!isLocal) {
val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, val port = conf.getInt("spark.executor.port", 0)
isDriver = false, isLocal = false) val _env = SparkEnv.createExecutorEnv(
conf, executorId, slaveHostname, port, isLocal, actorSystem)
SparkEnv.set(_env) SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource) _env.metricsSystem.registerSource(executorSource)
_env _env
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment