diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 72540c712a26b7accce5eb27c12e943c0996e164..6bab1f31d0c8c56daceeb67f407f48bc4ba31aff 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -256,7 +256,9 @@ class SparkContext( private[spark] var checkpointDir: Option[String] = None // Thread Local variable that can be used by users to pass information down the stack - private val localProperties = new ThreadLocal[Properties] + private val localProperties = new InheritableThreadLocal[Properties] { + override protected def childValue(parent: Properties): Properties = new Properties(parent) + } def initLocalProperties() { localProperties.set(new Properties()) @@ -273,6 +275,9 @@ class SparkContext( } } + def getLocalProperty(key: String): String = + Option(localProperties.get).map(_.getProperty(key)).getOrElse(null) + /** Set a human readable description of the current job. */ def setJobDescription(value: String) { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala index f80823317bde7adac3af0b450ce7ba5c9af41080..114617c51a3c0ace5b00cd1a91ffa7d45cd865cb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala @@ -17,14 +17,12 @@ package org.apache.spark.scheduler.cluster -import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException} -import java.util.Properties - -import scala.xml.XML +import java.io.{FileInputStream, InputStream} +import java.util.{NoSuchElementException, Properties} import org.apache.spark.Logging -import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode +import scala.xml.XML /** * An interface to build Schedulable tree @@ -51,7 +49,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { - val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file") + val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file")) + val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" val DEFAULT_POOL_NAME = "default" val MINIMUM_SHARES_PROPERTY = "minShare" @@ -64,48 +63,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) val DEFAULT_WEIGHT = 1 override def buildPools() { - if (schedulerAllocFile != null) { - val file = new File(schedulerAllocFile) - if (file.exists()) { - val xml = XML.loadFile(file) - for (poolNode <- (xml \\ POOLS_PROPERTY)) { - - val poolName = (poolNode \ POOL_NAME_PROPERTY).text - var schedulingMode = DEFAULT_SCHEDULING_MODE - var minShare = DEFAULT_MINIMUM_SHARE - var weight = DEFAULT_WEIGHT - - val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text - if (xmlSchedulingMode != "") { - try { - schedulingMode = SchedulingMode.withName(xmlSchedulingMode) - } catch { - case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode") - } - } - - val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text - if (xmlMinShare != "") { - minShare = xmlMinShare.toInt - } - - val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text - if (xmlWeight != "") { - weight = xmlWeight.toInt - } - - val pool = new Pool(poolName, schedulingMode, minShare, weight) - rootPool.addSchedulable(pool) - logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( - poolName, schedulingMode, minShare, weight)) + var is: Option[InputStream] = None + try { + is = Option { + schedulerAllocFile.map { f => + new FileInputStream(f) + }.getOrElse { + getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) } - } else { - throw new java.io.FileNotFoundException( - "Fair scheduler allocation file not found: " + schedulerAllocFile) } + + is.foreach { i => buildFairSchedulerPool(i) } + } finally { + is.foreach(_.close()) } // finally create "default" pool + buildDefaultPool() + } + + private def buildDefaultPool() { if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) @@ -115,6 +92,42 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) } } + private def buildFairSchedulerPool(is: InputStream) { + val xml = XML.load(is) + for (poolNode <- (xml \\ POOLS_PROPERTY)) { + + val poolName = (poolNode \ POOL_NAME_PROPERTY).text + var schedulingMode = DEFAULT_SCHEDULING_MODE + var minShare = DEFAULT_MINIMUM_SHARE + var weight = DEFAULT_WEIGHT + + val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text + if (xmlSchedulingMode != "") { + try { + schedulingMode = SchedulingMode.withName(xmlSchedulingMode) + } catch { + case e: NoSuchElementException => + logWarning("Error xml schedulingMode, using default schedulingMode") + } + } + + val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text + if (xmlMinShare != "") { + minShare = xmlMinShare.toInt + } + + val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text + if (xmlWeight != "") { + weight = xmlWeight.toInt + } + + val pool = new Pool(poolName, schedulingMode, minShare, weight) + rootPool.addSchedulable(pool) + logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( + poolName, schedulingMode, minShare, weight)) + } + } + override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME var parentPool = rootPool.getSchedulableByName(poolName) diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala index 6ec124da9c7b17a933717e0f195119fc558b9f5f..459e257d79a36a4a12d3e5c1f0de6490e09106ca 100644 --- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala @@ -40,17 +40,17 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self } def resetSparkContext() = { - if (sc != null) { - LocalSparkContext.stop(sc) - sc = null - } + LocalSparkContext.stop(sc) + sc = null } } object LocalSparkContext { def stop(sc: SparkContext) { - sc.stop() + if (sc != null) { + sc.stop() + } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") System.clearProperty("spark.hostPort") diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala index 97cbca09bfa2687939894b8cf5281b4cbbfa6c95..288aa14eeb03b2bcc7be42bd751716260bb90a11 100644 --- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala @@ -33,10 +33,8 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite => } override def afterAll() { - if (_sc != null) { - LocalSparkContext.stop(_sc) - _sc = null - } + LocalSparkContext.stop(_sc) + _sc = null super.afterAll() } } diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala index 69383ddfb8bc7471653ad5d9234fa45f9bbb9737..75d6493e338fe71683316538a32ca679357a447d 100644 --- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala @@ -40,7 +40,7 @@ object ThreadingSuiteState { } class ThreadingSuite extends FunSuite with LocalSparkContext { - + test("accessing SparkContext form a different thread") { sc = new SparkContext("local", "test") val nums = sc.parallelize(1 to 10, 2) @@ -149,4 +149,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext { fail("One or more threads didn't see runningThreads = 4") } } + + test("set local properties in different thread") { + sc = new SparkContext("local", "test") + val sem = new Semaphore(0) + + val threads = (1 to 5).map { i => + new Thread() { + override def run() { + sc.setLocalProperty("test", i.toString) + assert(sc.getLocalProperty("test") === i.toString) + sem.release() + } + } + } + + threads.foreach(_.start()) + + sem.acquire(5) + assert(sc.getLocalProperty("test") === null) + } + + test("set and get local properties in parent-children thread") { + sc = new SparkContext("local", "test") + sc.setLocalProperty("test", "parent") + val sem = new Semaphore(0) + + val threads = (1 to 5).map { i => + new Thread() { + override def run() { + assert(sc.getLocalProperty("test") === "parent") + sc.setLocalProperty("test", i.toString) + assert(sc.getLocalProperty("test") === i.toString) + sem.release() + } + } + } + + threads.foreach(_.start()) + + sem.acquire(5) + assert(sc.getLocalProperty("test") === "parent") + assert(sc.getLocalProperty("Foo") === null) + } } diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 419d0fe13f944621dd06df9620f16c3f28bbf605..6b7d202a88174e8b9046463e3e75324ba43b99d8 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -364,12 +364,12 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): slave_nodes = [] for res in reservations: active = [i for i in res.instances if is_active(i)] - if len(active) > 0: - group_names = [g.name for g in res.groups] + for inst in active: + group_names = [g.name for g in inst.groups] if group_names == [cluster_name + "-master"]: - master_nodes += res.instances + master_nodes.append(inst) elif group_names == [cluster_name + "-slaves"]: - slave_nodes += res.instances + slave_nodes.append(inst) if any((master_nodes, slave_nodes)): print ("Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes))) diff --git a/spark-class b/spark-class index 037abda3b710ab260d385103b7579befc1be1164..e111ef6da7e7c40cbac450d62d4ab3cfcf6b960d 100755 --- a/spark-class +++ b/spark-class @@ -37,7 +37,7 @@ fi # If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable # values for that; it doesn't need a lot -if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then +if [ "$1" = "org.apache.spark.deploy.master.Master" -o "$1" = "org.apache.spark.deploy.worker.Worker" ]; then SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m} SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true" # Do not overwrite SPARK_JAVA_OPTS environment variable in this script @@ -49,19 +49,19 @@ fi # Add java opts for master, worker, executor. The opts maybe null case "$1" in - 'spark.deploy.master.Master') + 'org.apache.spark.deploy.master.Master') OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS" ;; - 'spark.deploy.worker.Worker') + 'org.apache.spark.deploy.worker.Worker') OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS" ;; - 'spark.executor.StandaloneExecutorBackend') + 'org.apache.spark.executor.StandaloneExecutorBackend') OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" ;; - 'spark.executor.MesosExecutorBackend') + 'org.apache.spark.executor.MesosExecutorBackend') OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" ;; - 'spark.repl.Main') + 'org.apache.spark.repl.Main') OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS" ;; esac