Skip to content
Snippets Groups Projects
Commit f5f12dc2 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Merge pull request #336 from liancheng/akka-remote-lookup

Get rid of `Either[ActorRef, ActorSelection]'

In this pull request, instead of returning an `Either[ActorRef, ActorSelection]`, `registerOrLookup` identifies the remote actor blockingly to obtain an `ActorRef`, or throws an exception if the remote actor doesn't exist or the lookup times out (configured by `spark.akka.lookupTimeout`).  This function is only called when an `SparkEnv` is constructed (instantiating driver or executor), so the blocking call is considered acceptable.  Executor side `ActorSelection`s/`ActorRef`s to driver side `MapOutputTrackerMasterActor` and `BlockManagerMasterActor` are affected by this pull request.

`ActorSelection` is dangerous and should be used with care.  It's only absolutely safe to send messages via an `ActorSelection` when the remote actor is stateless, so that actor incarnation is irrelevant.  But as pointed by @ScrapCodes in the comments below, executor exits immediately once the connection to the driver lost, `ActorSelection`s are not harmful in this scenario.  So this pull request is mostly a code style patch.
parents 11891e68 eb246847
No related branches found
No related tags found
No related merge requests found
......@@ -55,7 +55,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
private val timeout = AkkaUtils.askTimeout(conf)
// Set to the MapOutputTrackerActor living on the driver
var trackerActor: Either[ActorRef, ActorSelection] = _
var trackerActor: ActorRef = _
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
......@@ -71,17 +71,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
// throw a SparkException if this fails.
private def askTracker(message: Any): Any = {
try {
/*
The difference between ActorRef and ActorSelection is well explained here:
http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#Use_actorSelection_instead_of_actorFor
In spark a map output tracker can be either started on Driver where it is created which
is an ActorRef or it can be on executor from where it is looked up which is an
actorSelection.
*/
val future = trackerActor match {
case Left(a: ActorRef) => a.ask(message)(timeout)
case Right(b: ActorSelection) => b.ask(message)(timeout)
}
val future = trackerActor.ask(message)(timeout)
Await.result(future, timeout)
} catch {
case e: Exception =>
......
......@@ -17,11 +17,10 @@
package org.apache.spark
import collection.mutable
import serializer.Serializer
import scala.collection.mutable
import scala.concurrent.Await
import akka.actor._
import akka.remote.RemoteActorRefProvider
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
......@@ -157,17 +156,18 @@ object SparkEnv extends Logging {
conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
conf)
def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
logInfo("Registering " + name)
Left(actorSystem.actorOf(Props(newActor), name = name))
actorSystem.actorOf(Props(newActor), name = name)
} else {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.get("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to " + name + ": " + url)
Right(actorSystem.actorSelection(url))
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
}
......
......@@ -28,8 +28,7 @@ import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
private[spark]
class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
conf: SparkConf) extends Logging {
class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt
......@@ -159,10 +158,7 @@ class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
val future = driverActor match {
case Left(a: ActorRef) => a.ask(message)(timeout)
case Right(b: ActorSelection) => b.ask(message)(timeout)
}
val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
throw new SparkException("BlockManagerMaster returned null")
......
......@@ -95,7 +95,7 @@ private[spark] object ThreadingTest {
val conf = new SparkConf()
val serializer = new KryoSerializer(conf)
val blockManagerMaster = new BlockManagerMaster(
Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
......
......@@ -105,4 +105,9 @@ private[spark] object AkkaUtils {
def askTimeout(conf: SparkConf): FiniteDuration = {
Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds")
}
/** Returns the default Spark timeout to use for Akka remote actor lookup. */
def lookupTimeout(conf: SparkConf): FiniteDuration = {
Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds")
}
}
......@@ -23,6 +23,7 @@ import akka.actor._
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils
import scala.concurrent.Await
class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
private val conf = new SparkConf
......@@ -49,14 +50,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master start and stop") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
tracker.stop()
}
test("master register and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
......@@ -75,7 +76,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master register and unregister and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
......@@ -101,13 +102,15 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = Left(actorSystem.actorOf(
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker"))
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf)
val slaveTracker = new MapOutputTracker(conf)
slaveTracker.trackerActor = Right(slaveSystem.actorSelection(
"akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker"))
val selection = slaveSystem.actorSelection(
s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
val timeout = AkkaUtils.lookupTimeout(conf)
slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()
......
......@@ -56,7 +56,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
conf.set("spark.hostPort", "localhost:" + boundPort)
master = new BlockManagerMaster(
Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
......
......@@ -23,9 +23,7 @@ import scala.collection.mutable
import com.google.common.io.Files
import org.apache.spark.SparkConf
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import scala.util.Try
import akka.actor.{Props, ActorSelection, ActorSystem}
import org.scalatest.{BeforeAndAfterEach, FunSuite}
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
private val testConf = new SparkConf(false)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment