From cf074f9c9660c7ddaac96f060f59eed1f19ecc06 Mon Sep 17 00:00:00 2001 From: Denny <dennybritz@gmail.com> Date: Tue, 4 Sep 2012 21:13:25 -0700 Subject: [PATCH] Renamed class. --- core/src/main/scala/spark/SparkContext.scala | 6 +- .../spark/deploy/LocalSparkCluster.scala | 59 +++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 core/src/main/scala/spark/deploy/LocalSparkCluster.scala diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 92c0dcc876..e24f4be5a4 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -34,6 +34,8 @@ import org.apache.mesos.{Scheduler, MesosNativeLibrary} import spark.broadcast._ +import spark.deploy.LocalSparkCluster + import spark.partial.ApproximateEvaluator import spark.partial.PartialResult @@ -104,8 +106,8 @@ class SparkContext( case SPARK_LOCALCLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) => val scheduler = new ClusterScheduler(this) - val sparkUrl = spark.deploy.DeployUtils.startLocalSparkCluster(numSlaves.toInt, - coresPerSlave.toInt, memoryPerlave.toInt) + val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt) + val sparkUrl = localCluster.start() val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) scheduler.initialize(backend) scheduler diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala new file mode 100644 index 0000000000..85a7b65c38 --- /dev/null +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -0,0 +1,59 @@ +package spark.deploy + +import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} + +import spark.deploy.worker.Worker +import spark.deploy.master.Master +import spark.util.AkkaUtils +import spark.{Logging, Utils} + +import scala.collection.mutable.ArrayBuffer + +class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int, + memoryPerSlave : Int) extends Logging { + + val threadPool = Utils.newDaemonFixedThreadPool(numSlaves + 1) + val localIpAddress = Utils.localIpAddress + + var masterActor : ActorRef = _ + var masterPort : Int = _ + var masterUrl : String = _ + + val slaveActors = ArrayBuffer[ActorRef]() + + def start() : String = { + + logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.") + + /* Start the Master */ + val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0) + masterUrl = "spark://" + localIpAddress + ":" + masterPort + threadPool.execute(new Runnable { + def run() { + val actor = actorSystem.actorOf( + Props(new Master(localIpAddress, masterPort, 8080)), name = "Master") + masterActor = actor + actorSystem.awaitTermination() + } + }) + + /* Start the Slaves */ + (1 to numSlaves + 1).foreach { slaveNum => + val (actorSystem, boundPort) = + AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0) + threadPool.execute(new Runnable { + def run() { + val actor = actorSystem.actorOf( + Props(new Worker(localIpAddress, boundPort, 8080 + slaveNum, coresPerSlave, memoryPerSlave, masterUrl)), + name = "Worker") + slaveActors += actor + actorSystem.awaitTermination() + } + }) + } + + return masterUrl + } + + +} \ No newline at end of file -- GitLab