diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 5df8213d749098c6bb58ac0230065d003481a7ba..407e9ffe90c5c76b8bb6855a36523444e9ca0ba6 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.util -import akka.actor.{ActorSystem, ExtendedActorSystem} +import akka.actor.{SparkActorSystem, ActorSystem, ExtendedActorSystem} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.concurrent.Await @@ -70,7 +70,7 @@ private[spark] object AkkaUtils { |akka.remote.log-remote-lifecycle-events = $lifecycleEvents """.stripMargin) - val actorSystem = ActorSystem(name, akkaConf) + val actorSystem = SparkActorSystem(name, akkaConf) val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.getDefaultAddress.port.get diff --git a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala b/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala new file mode 100644 index 0000000000000000000000000000000000000000..461e7ab08f47b2aabf960cb6ee061cdbdba4a286 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> + */ + +package akka.actor + +import com.typesafe.config.Config +import akka.util._ +import scala.util.control.{NonFatal, ControlThrowable} + +/** + * An actorSystem specific to spark. It has an additional feature of letting spark tolerate + * fatal exceptions. + */ +object SparkActorSystem { + + def apply(name: String, config: Config): ActorSystem = apply(name, config, findClassLoader()) + + def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = + new SparkActorSystemImpl(name, config, classLoader).start() + + /** + * INTERNAL API + */ + private[akka] def findClassLoader(): ClassLoader = { + def findCaller(get: Int ⇒ Class[_]): ClassLoader = + Iterator.from(2 /*is the magic number, promise*/).map(get) dropWhile { + c ⇒ + c != null && + (c.getName.startsWith("akka.actor.ActorSystem") || + c.getName.startsWith("scala.Option") || + c.getName.startsWith("scala.collection.Iterator") || + c.getName.startsWith("akka.util.Reflect")) + } next() match { + case null ⇒ getClass.getClassLoader + case c ⇒ c.getClassLoader + } + + Option(Thread.currentThread.getContextClassLoader) orElse + (Reflect.getCallerClass map findCaller) getOrElse + getClass.getClassLoader + } +} + +private[akka] class SparkActorSystemImpl(override val name: String, + applicationConfig: Config, + classLoader: ClassLoader) + extends ActorSystemImpl(name, applicationConfig, classLoader) { + + protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = + new Thread.UncaughtExceptionHandler() { + def uncaughtException(thread: Thread, cause: Throwable): Unit = { + cause match { + case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable + ⇒ log.error(cause, "Uncaught error from thread [{}]", thread.getName) + case _ ⇒ + if (settings.JvmExitOnFatalError) { + try { + log.error(cause, "Uncaught error from thread [{}] shutting down JVM since " + + "'akka.jvm-exit-on-fatal-error' is enabled", thread.getName) + import System.err + err.print("Uncaught error from thread [") + err.print(thread.getName) + err.print("] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for " + + "ActorSystem[") + err.print(name) + err.println("]") + cause.printStackTrace(System.err) + System.err.flush() + } finally { + System.exit(-1) + } + } else { + log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " + + "ActorSystem tolerating and continuing.... [{}]", thread.getName, name) + //shutdown() //TODO make it configurable + if (thread.isAlive) log.error("Thread is still alive") + else { + log.error("Thread is dead") + } + } + } + } + } + + override def stop(actor: ActorRef): Unit = { + val path = actor.path + val guard = guardian.path + val sys = systemGuardian.path + path.parent match { + case `guard` ⇒ guardian ! StopChild(actor) + case `sys` ⇒ systemGuardian ! StopChild(actor) + case _ ⇒ actor.asInstanceOf[InternalActorRef].stop() + } + } + + + override def /(actorName: String): ActorPath = guardian.path / actorName + + override def /(path: Iterable[String]): ActorPath = guardian.path / path + + private lazy val _start: this.type = { + // the provider is expected to start default loggers, LocalActorRefProvider does this + provider.init(this) + this + } + + override def start(): this.type = _start + + override def toString: String = lookupRoot.path.root.address.toString + +}