Skip to content
Snippets Groups Projects
Commit 3749f941 authored by Denny's avatar Denny
Browse files

Start a standalone cluster locally.

parent c2da6440
No related branches found
No related tags found
No related merge requests found
......@@ -81,9 +81,11 @@ class SparkContext(
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
val SPARK_LOCALCLUSTER_REGEX = """spark-cluster\[([0-9]+)\,([0-9]+),([0-9]+)]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """(spark://.*)""".r
master match {
case "local" =>
new LocalScheduler(1, 0)
......@@ -99,6 +101,14 @@ class SparkContext(
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
scheduler
case SPARK_LOCALCLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) =>
val scheduler = new ClusterScheduler(this)
val sparkUrl = spark.deploy.DeployUtils.startLocalSparkCluster(numSlaves.toInt,
coresPerSlave.toInt, memoryPerlave.toInt)
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
scheduler
case _ =>
MesosNativeLibrary.load()
......
package spark.deploy
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
import spark.deploy.worker.Worker
import spark.deploy.master.Master
import spark.util.AkkaUtils
import spark.{Logging, Utils}
import scala.collection.mutable.ArrayBuffer
object DeployUtils extends Logging {
/* Starts a local standalone Spark cluster with a specified number of slaves */
def startLocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
memoryPerSlave : Int) : String = {
logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
val threadPool = Utils.newDaemonFixedThreadPool(numSlaves + 1)
val localIpAddress = Utils.localIpAddress
val workers = ArrayBuffer[ActorRef]()
/* Start the Master */
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
val masterUrl = "spark://" + localIpAddress + ":" + boundPort
threadPool.execute(new Runnable {
def run() {
val actor = actorSystem.actorOf(
Props(new Master(localIpAddress, boundPort, 8080)), name = "Master")
actorSystem.awaitTermination()
}
})
/* Start the Slaves */
(1 to numSlaves + 1).foreach { slaveNum =>
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
threadPool.execute(new Runnable {
def run() {
val actor = actorSystem.actorOf(
Props(new Worker(localIpAddress, boundPort, 8080 + slaveNum, coresPerSlave, memoryPerSlave, masterUrl)),
name = "Worker")
workers += actor
actorSystem.awaitTermination()
}
})
}
return masterUrl
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment