Skip to content
Snippets Groups Projects
Commit cf074f9c authored by Denny's avatar Denny
Browse files

Renamed class.

parent 3749f941
No related branches found
No related tags found
No related merge requests found
......@@ -34,6 +34,8 @@ import org.apache.mesos.{Scheduler, MesosNativeLibrary}
import spark.broadcast._
import spark.deploy.LocalSparkCluster
import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
......@@ -104,8 +106,8 @@ class SparkContext(
case SPARK_LOCALCLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) =>
val scheduler = new ClusterScheduler(this)
val sparkUrl = spark.deploy.DeployUtils.startLocalSparkCluster(numSlaves.toInt,
coresPerSlave.toInt, memoryPerlave.toInt)
val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt)
val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
scheduler
......
package spark.deploy
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
import spark.deploy.worker.Worker
import spark.deploy.master.Master
import spark.util.AkkaUtils
import spark.{Logging, Utils}
import scala.collection.mutable.ArrayBuffer
class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
memoryPerSlave : Int) extends Logging {
val threadPool = Utils.newDaemonFixedThreadPool(numSlaves + 1)
val localIpAddress = Utils.localIpAddress
var masterActor : ActorRef = _
var masterPort : Int = _
var masterUrl : String = _
val slaveActors = ArrayBuffer[ActorRef]()
def start() : String = {
logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
/* Start the Master */
val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
masterUrl = "spark://" + localIpAddress + ":" + masterPort
threadPool.execute(new Runnable {
def run() {
val actor = actorSystem.actorOf(
Props(new Master(localIpAddress, masterPort, 8080)), name = "Master")
masterActor = actor
actorSystem.awaitTermination()
}
})
/* Start the Slaves */
(1 to numSlaves + 1).foreach { slaveNum =>
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
threadPool.execute(new Runnable {
def run() {
val actor = actorSystem.actorOf(
Props(new Worker(localIpAddress, boundPort, 8080 + slaveNum, coresPerSlave, memoryPerSlave, masterUrl)),
name = "Worker")
slaveActors += actor
actorSystem.awaitTermination()
}
})
}
return masterUrl
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment