Skip to content
Snippets Groups Projects
Commit 5a864e3f authored by Aaron Davidson's avatar Aaron Davidson
Browse files

Rename SparkActorSystem to IndestructibleActorSystem

parents f6c8c1c7 c9cd2af7
No related branches found
No related tags found
No related merge requests found
......@@ -97,7 +97,8 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
indestructible = true)
// set it
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
......
......@@ -36,7 +36,7 @@ private[spark] class SimrSchedulerBackend(
override def start() {
super.start()
val driverUrl = "akka://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
......
......@@ -44,7 +44,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
* Flush the partial writes and commit them as a single atomic block. Return the
* number of bytes written for this commit.
*/
def commit(): Long
def commit(): LongSpark
/**
* Reverts writes that haven't been flushed yet. Callers should invoke this function
......
......@@ -17,7 +17,7 @@
package org.apache.spark.util
import akka.actor.{SparkActorSystem, ActorSystem, ExtendedActorSystem}
import akka.actor.{IndestructibleActorSystem, ActorSystem, ExtendedActorSystem}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.concurrent.Await
......@@ -34,8 +34,13 @@ private[spark] object AkkaUtils {
*
* Note: the `name` parameter is important, as even if a client sends a message to right
* host + port, if the system name is incorrect, Akka will drop the message.
*
* If indestructible is set to true, the Actor System will continue running in the event
* of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
*/
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false)
: (ActorSystem, Int) = {
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
......@@ -70,7 +75,11 @@ private[spark] object AkkaUtils {
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
""".stripMargin)
val actorSystem = SparkActorSystem(name, akkaConf)
val actorSystem = if (indestructible) {
IndestructibleActorSystem(name, akkaConf)
} else {
ActorSystem(name, akkaConf)
}
val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
val boundPort = provider.getDefaultAddress.port.get
......
......@@ -10,20 +10,19 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.typesafe.config.Config
/**
* An ActorSystem specific to Spark. Based off of [[akka.actor.ActorSystem]].
* The only change from the default system is that we do not shut down the ActorSystem
* in the event of a fatal exception. This is necessary as Spark is allowed to recover
* from fatal exceptions (see [[org.apache.spark.executor.Executor]]).
* An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception.
* This is necessary as Spark Executors are allowed to recover from fatal exceptions
* (see [[org.apache.spark.executor.Executor]]).
*/
object SparkActorSystem {
object IndestructibleActorSystem {
def apply(name: String, config: Config): ActorSystem =
apply(name, config, ActorSystem.findClassLoader())
def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
new SparkActorSystemImpl(name, config, classLoader).start()
new IndestructibleActorSystemImpl(name, config, classLoader).start()
}
private[akka] class SparkActorSystemImpl(
private[akka] class IndestructibleActorSystemImpl(
override val name: String,
applicationConfig: Config,
classLoader: ClassLoader)
......@@ -36,7 +35,7 @@ private[akka] class SparkActorSystemImpl(
def uncaughtException(thread: Thread, cause: Throwable): Unit = {
if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
"ActorSystem tolerating and continuing.... [{}]", thread.getName, name)
"ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
//shutdown() //TODO make it configurable
} else {
fallbackHandler.uncaughtException(thread, cause)
......
......@@ -168,7 +168,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
System.setProperty("spark.driver.host", driverHost)
System.setProperty("spark.driver.port", driverPort.toString)
val driverUrl = "akka://spark@%s:%s/user/%s".format(
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment