diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 55e563ee968bedbe8d2ee438439642b528ecf88b..2a56bf28d70270693375e9e360a8005a9326b86f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -794,7 +794,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort: /** * We try to reuse a single Socket to transfer accumulator updates, as they are all added - * by the DAGScheduler's single-threaded actor anyway. + * by the DAGScheduler's single-threaded RpcEndpoint anyway. */ @transient var socket: Socket = _ diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 53356addf6edbcfaa48995c20aca024ca11c890f..83ccaadfe7447fe9e40cccaa4724ee494d688c4a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -73,12 +73,8 @@ class LocalSparkCluster( def stop() { logInfo("Shutting down local Spark cluster.") // Stop the workers before the master so they don't get upset that it disconnected - // TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors! - // This is unfortunate, but for now we just comment it out. workerRpcEnvs.foreach(_.shutdown()) - // workerActorSystems.foreach(_.awaitTermination()) masterRpcEnvs.foreach(_.shutdown()) - // masterActorSystems.foreach(_.awaitTermination()) masterRpcEnvs.clear() workerRpcEnvs.clear() } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 7576a2985ee7be23e9a38a5584be4e4791b260e1..25ea6925434ab41f6b8d3361d34df35370c859f9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -257,7 +257,7 @@ private[spark] class AppClient( } def start() { - // Just launch an actor; it will call back into the listener. + // Just launch an rpcEndpoint; it will call back into the listener. endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala index cf77c86d760cf35d52ff889355572baeabe04d3b..70f21fbe0de853c8e9a930136c47cb39811208d9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala @@ -26,7 +26,7 @@ import org.apache.spark.annotation.DeveloperApi */ @DeveloperApi trait LeaderElectionAgent { - val masterActor: LeaderElectable + val masterInstance: LeaderElectable def stop() {} // to avoid noops in implementations. } @@ -37,7 +37,7 @@ trait LeaderElectable { } /** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */ -private[spark] class MonarchyLeaderAgent(val masterActor: LeaderElectable) +private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable) extends LeaderElectionAgent { - masterActor.electedLeader() + masterInstance.electedLeader() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala index 68c937188b333476da87fa4e2eddb780b0395657..a952cee36eb44c2044fa0adb485b860ef224d044 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala @@ -38,5 +38,5 @@ private[master] object MasterMessages { case object BoundPortsRequest - case class BoundPortsResponse(actorPort: Int, webUIPort: Int, restPort: Option[Int]) + case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int]) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala index 6fdff86f66e01f53eb51802405cf81c97b6c9f66..d317206a614fb85da8125a6e7d36c7013bf0d104 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch} import org.apache.spark.deploy.SparkCuratorUtil -private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable, +private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable, conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging { val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" @@ -73,10 +73,10 @@ private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElecta private def updateLeadershipStatus(isLeader: Boolean) { if (isLeader && status == LeadershipStatus.NOT_LEADER) { status = LeadershipStatus.LEADER - masterActor.electedLeader() + masterInstance.electedLeader() } else if (!isLeader && status == LeadershipStatus.LEADER) { status = LeadershipStatus.NOT_LEADER - masterActor.revokedLeadership() + masterInstance.revokedLeadership() } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index c82a7ccab54dca5625f4f1306bbd305d3286baf7..6792d3310b06c228dedd36243b47e76f95d56f21 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -228,7 +228,7 @@ private[deploy] class Worker( /** * Re-register with the master because a network failure or a master failure has occurred. * If the re-registration attempt threshold is exceeded, the worker exits with error. - * Note that for thread-safety this should only be called from the actor. + * Note that for thread-safety this should only be called from the rpcEndpoint. */ private def reregisterWithMaster(): Unit = { Utils.tryOrExit { @@ -365,7 +365,8 @@ private[deploy] class Worker( if (connected) { sendToMaster(Heartbeat(workerId, self)) } case WorkDirCleanup => - // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor + // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker + // rpcEndpoint. // Copy ids so that it can be used in the cleanup thread. val appIds = executors.values.map(_.appId).toSet val cleanupFuture = concurrent.future { @@ -684,7 +685,7 @@ private[deploy] object Worker extends Logging { workerNumber: Option[Int] = None, conf: SparkConf = new SparkConf): RpcEnv = { - // The LocalSparkCluster runs multiple local sparkWorkerX actor systems + // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("") val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala index fae5640b9a213d92b89e443751710375eab7d1b6..735c4f0927150d89b40831fb0ebf9805ce659e39 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala @@ -43,7 +43,7 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: Strin private[deploy] def setTesting(testing: Boolean) = isTesting = testing private var isTesting = false - // Lets us filter events only from the worker's actor system + // Lets filter events only from the worker's rpc system private val expectedAddress = RpcAddress.fromURIString(workerUrl) private def isWorker(address: RpcAddress) = expectedAddress == address @@ -62,7 +62,7 @@ private[spark] class WorkerWatcher(override val rpcEnv: RpcEnv, workerUrl: Strin override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (isWorker(remoteAddress)) { // This log message will never be seen - logError(s"Lost connection to worker actor $workerUrl. Exiting.") + logError(s"Lost connection to worker rpc endpoint $workerUrl. Exiting.") exitNonZero() } } diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index 6ae47894598be862c14cc552370a743d4759d355..7409ac88599916588b88085b511bdf8a196a2b7e 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -100,7 +100,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf) val future = ask[T](message, timeout) val result = timeout.awaitResult(future) if (result == null) { - throw new SparkException("Actor returned null") + throw new SparkException("RpcEndpoint returned null") } return result } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 8321037cdc026f0f90eae8dc1474dfad15eb928e..5d926377ce86bdbeb7eda0ca3151465fd799cee4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -162,7 +162,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) private[spark] object OutputCommitCoordinator { - // This actor is used only for RPC + // This endpoint is used only for RPC private[spark] class OutputCommitCoordinatorEndpoint( override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator) extends RpcEndpoint with Logging { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index 26e72c0bff38d93473c6b0d305ce893db8ee0515..626a2b7d69abe9d02e9ff0c5dff9fa47550a09f5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -22,7 +22,7 @@ import org.apache.spark.rpc.{RpcEndpointRef, RpcAddress} /** * Grouping of data for an executor used by CoarseGrainedSchedulerBackend. * - * @param executorEndpoint The ActorRef representing this executor + * @param executorEndpoint The RpcEndpointRef representing this executor * @param executorAddress The network address of this executor * @param executorHost The hostname that this executor is running on * @param freeCores The current number of cores available for work on the executor diff --git a/core/src/main/scala/org/apache/spark/util/IdGenerator.scala b/core/src/main/scala/org/apache/spark/util/IdGenerator.scala index 17e55f7996bf77574acf620ee2a9b6ef96b61ffc..53934ad4ce4774ba4df0cace571b9f3ff02a1eca 100644 --- a/core/src/main/scala/org/apache/spark/util/IdGenerator.scala +++ b/core/src/main/scala/org/apache/spark/util/IdGenerator.scala @@ -22,10 +22,10 @@ import java.util.concurrent.atomic.AtomicInteger /** * A util used to get a unique generation ID. This is a wrapper around Java's * AtomicInteger. An example usage is in BlockManager, where each BlockManager - * instance would start an Akka actor and we use this utility to assign the Akka - * actors unique names. + * instance would start an RpcEndpoint and we use this utility to assign the RpcEndpoints' + * unique names. */ private[spark] class IdGenerator { - private var id = new AtomicInteger + private val id = new AtomicInteger def next: Int = id.incrementAndGet } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala b/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala index 8c96b0e71dfdd5b171a327179de67a735003bf85..4b86da536768cd1a9ae3e215578f2fe8245e0c31 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/CustomRecoveryModeFactory.scala @@ -99,7 +99,7 @@ object CustomPersistenceEngine { @volatile var lastInstance: Option[CustomPersistenceEngine] = None } -class CustomLeaderElectionAgent(val masterActor: LeaderElectable) extends LeaderElectionAgent { - masterActor.electedLeader() +class CustomLeaderElectionAgent(val masterInstance: LeaderElectable) extends LeaderElectionAgent { + masterInstance.electedLeader() } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala index cd24d7942331674848f428d30d46657194d2dbcd..e9034e39a715c8da45539b5fa3c44c9feee3c293 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala @@ -38,12 +38,11 @@ class WorkerWatcherSuite extends SparkFunSuite { val conf = new SparkConf() val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf)) val targetWorkerUrl = rpcEnv.uriOf("test", RpcAddress("1.2.3.4", 1234), "Worker") - val otherAddress = "akka://test@4.3.2.1:1234/user/OtherActor" - val otherAkkaAddress = RpcAddress("4.3.2.1", 1234) + val otherRpcAddress = RpcAddress("4.3.2.1", 1234) val workerWatcher = new WorkerWatcher(rpcEnv, targetWorkerUrl) workerWatcher.setTesting(testing = true) rpcEnv.setupEndpoint("worker-watcher", workerWatcher) - workerWatcher.onDisconnected(otherAkkaAddress) + workerWatcher.onDisconnected(otherRpcAddress) assert(!workerWatcher.isShutDown) rpcEnv.shutdown() } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 280aac931915d642814ba68ee0346483508488ca..b60ae784c37989085b0cad1e6036b52fdda24abf 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -182,7 +182,7 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]( "org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast"), ProblemFilters.exclude[MissingClassProblem]( - "org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor") + "org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint") ) ++ Seq( // SPARK-4655 - Making Stage an Abstract class broke binary compatility even though // the stage class is defined as private[spark] diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 8130868fe148793816f7b22f9d4cb0be076ae535..304b1e8cdbed5f1647577927af48092886e0bf19 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -981,7 +981,7 @@ class SparkILoop( // which spins off a separate thread, then print the prompt and try // our best to look ready. The interlocking lazy vals tend to // inter-deadlock, so we break the cycle with a single asynchronous - // message to an actor. + // message to an rpcEndpoint. if (isAsync) { intp initialize initializedCallback() createAsyncListener() // listens for signal to run postInitialization