diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index 6c36f3cca45e0aaa849784c1ed184b32c673a2ae..ae10f615d1312be7f8dea3cca7862fcb98cc297d 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -31,7 +31,7 @@ # 1. To add a new sink, set the "class" option to a fully qualified class # name (see examples below). # 2. Some sinks involve a polling period. The minimum allowed polling period -# is 1 second. +# is 1 second. # 3. Wild card properties can be overridden by more specific properties. # For example, master.sink.console.period takes precedence over # *.sink.console.period. @@ -47,11 +47,45 @@ # instance master and applications. MetricsServlet may not be configured by self. # +## List of available sinks and their properties. + +# org.apache.spark.metrics.sink.ConsoleSink +# Name: Default: Description: +# period 10 Poll period +# unit seconds Units of poll period + +# org.apache.spark.metrics.sink.CSVSink +# Name: Default: Description: +# period 10 Poll period +# unit seconds Units of poll period +# directory /tmp Where to store CSV files + +# org.apache.spark.metrics.sink.GangliaSink +# Name: Default: Description: +# host NONE Hostname or multicast group of Ganglia server +# port NONE Port of Ganglia server(s) +# period 10 Poll period +# unit seconds Units of poll period +# ttl 1 TTL of messages sent by Ganglia +# mode multicast Ganglia network mode ('unicast' or 'mulitcast') + +# org.apache.spark.metrics.sink.JmxSink + +# org.apache.spark.metrics.sink.MetricsServlet +# Name: Default: Description: +# path VARIES* Path prefix from the web server root +# sample false Whether to show entire set of samples for histograms ('false' or 'true') +# +# * Default path is /metrics/json for all instances except the master. The master has two paths: +# /metrics/aplications/json # App information +# /metrics/master/json # Master information + +## Examples # Enable JmxSink for all instances by class name -#*.sink.jmx.class=spark.metrics.sink.JmxSink +#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink # Enable ConsoleSink for all instances by class name -#*.sink.console.class=spark.metrics.sink.ConsoleSink +#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink # Polling period for ConsoleSink #*.sink.console.period=10 @@ -64,7 +98,7 @@ #master.sink.console.unit=seconds # Enable CsvSink for all instances -#*.sink.csv.class=spark.metrics.sink.CsvSink +#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink # Polling period for CsvSink #*.sink.csv.period=1 @@ -80,11 +114,11 @@ #worker.sink.csv.unit=minutes # Enable jvm source for instance master, worker, driver and executor -#master.source.jvm.class=spark.metrics.source.JvmSource +#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource -#worker.source.jvm.class=spark.metrics.source.JvmSource +#worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource -#driver.source.jvm.class=spark.metrics.source.JvmSource +#driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource -#executor.source.jvm.class=spark.metrics.source.JvmSource +#executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index e299a106ee6599cd610320c5dfe55864009590c6..68b99ca1253d5f3b64395f55bedc322c9dbb81b2 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -66,10 +66,12 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } try { // If we got here, we have to load the split - val elements = new ArrayBuffer[Any] logInfo("Computing partition " + split) - elements ++= rdd.computeOrReadCheckpoint(split, context) - // Try to put this block in the blockManager + val computedValues = rdd.computeOrReadCheckpoint(split, context) + // Persist the result, so long as the task is not running locally + if (context.runningLocally) { return computedValues } + val elements = new ArrayBuffer[Any] + elements ++= computedValues blockManager.put(key, elements, storageLevel, true) return elements.iterator.asInstanceOf[Iterator[T]] } finally { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 89318712a5777eee2046f7ec1246b471148c666e..4f711a5ea652f059dfb25d939fb5ae201d90f7fc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -282,8 +282,8 @@ class SparkContext( // Post init taskScheduler.postStartHook() - val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler) - val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager) + val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) + val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index b2dd668330a3acdc68a0f84160b09a1bcbbcd073..c2c358c7ad6062826d44f7003c928d6168b7f793 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -24,6 +24,7 @@ class TaskContext( val stageId: Int, val splitId: Int, val attemptId: Long, + val runningLocally: Boolean = false, val taskMetrics: TaskMetrics = TaskMetrics.empty() ) extends Serializable { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d3658049945c2d252914aecbae314bbc7d840f66..ceae3b8289b5efa31ba4a6bf36caf6d672b328fd 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -98,7 +98,7 @@ private[spark] class Executor( } ) - val executorSource = new ExecutorSource(this) + val executorSource = new ExecutorSource(this, executorId) // Initialize Spark environment (using system properties read above) val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index bf8fb4fd21f6b5133c10bdde1032d7e9c27561a6..18c9dc1c0a9bbb5e974c49fa14c6c1cbae18f097 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.metrics.source.Source -class ExecutorSource(val executor: Executor) extends Source { +class ExecutorSource(val executor: Executor, executorId: String) extends Source { private def fileStats(scheme: String) : Option[FileSystem.Statistics] = FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption @@ -39,7 +39,8 @@ class ExecutorSource(val executor: Executor) extends Source { } val metricRegistry = new MetricRegistry() - val sourceName = "executor" + // TODO: It would be nice to pass the application name here + val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] { diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 0f9c4e00b1fb9ca4d77bad33e4ec317cd6e2fd6f..caab748d602ee452703091c9f0663ed46a7328f5 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -37,10 +37,9 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi private def setDefaultProperties(prop: Properties) { prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet") - prop.setProperty("*.sink.servlet.uri", "/metrics/json") - prop.setProperty("*.sink.servlet.sample", "false") - prop.setProperty("master.sink.servlet.uri", "/metrics/master/json") - prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json") + prop.setProperty("*.sink.servlet.path", "/metrics/json") + prop.setProperty("master.sink.servlet.path", "/metrics/master/json") + prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json") } def initialize() { diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala new file mode 100644 index 0000000000000000000000000000000000000000..b924907070eb9204c8f74f8f23ea3ef9ac6a9436 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.util.Properties +import java.util.concurrent.TimeUnit + +import com.codahale.metrics.ganglia.GangliaReporter +import com.codahale.metrics.MetricRegistry +import info.ganglia.gmetric4j.gmetric.GMetric + +import org.apache.spark.metrics.MetricsSystem + +class GangliaSink(val property: Properties, val registry: MetricRegistry) extends Sink { + val GANGLIA_KEY_PERIOD = "period" + val GANGLIA_DEFAULT_PERIOD = 10 + + val GANGLIA_KEY_UNIT = "unit" + val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS + + val GANGLIA_KEY_MODE = "mode" + val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST + + // TTL for multicast messages. If listeners are X hops away in network, must be at least X. + val GANGLIA_KEY_TTL = "ttl" + val GANGLIA_DEFAULT_TTL = 1 + + val GANGLIA_KEY_HOST = "host" + val GANGLIA_KEY_PORT = "port" + + def propertyToOption(prop: String) = Option(property.getProperty(prop)) + + if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) { + throw new Exception("Ganglia sink requires 'host' property.") + } + + if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) { + throw new Exception("Ganglia sink requires 'port' property.") + } + + val host = propertyToOption(GANGLIA_KEY_HOST).get + val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt + val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) + val mode = propertyToOption(GANGLIA_KEY_MODE) + .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) + val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) + .getOrElse(GANGLIA_DEFAULT_PERIOD) + val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase)) + .getOrElse(GANGLIA_DEFAULT_UNIT) + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val ganglia = new GMetric(host, port, mode, ttl) + val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build(ganglia) + + override def start() { + reporter.start(pollPeriod, pollUnit) + } + + override def stop() { + reporter.stop() + } +} + diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index 4e90dd4323951a2d151b4c4ccd84983a940f8cb0..99357fede6d0640b3732ba1e8eb6ec720e5ff645 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -31,18 +31,21 @@ import org.eclipse.jetty.server.Handler import org.apache.spark.ui.JettyUtils class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink { - val SERVLET_KEY_URI = "uri" + val SERVLET_KEY_PATH = "path" val SERVLET_KEY_SAMPLE = "sample" - val servletURI = property.getProperty(SERVLET_KEY_URI) + val SERVLET_DEFAULT_SAMPLE = false - val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean + val servletPath = property.getProperty(SERVLET_KEY_PATH) + + val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean) + .getOrElse(SERVLET_DEFAULT_SAMPLE) val mapper = new ObjectMapper().registerModule( new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) def getHandlers = Array[(String, Handler)]( - (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json")) + (servletPath, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json")) ) def getMetricsSnapshot(request: HttpServletRequest): String = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 92add5b073ab0ccf99a8928d3774705d41307ed6..3e3f04f0876150154d99568f8676308699f14b26 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -478,7 +478,8 @@ class DAGScheduler( SparkEnv.set(env) val rdd = job.finalStage.rdd val split = rdd.partitions(job.partitions(0)) - val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0) + val taskContext = + new TaskContext(job.finalStage.id, job.partitions(0), 0, runningLocally = true) try { val result = job.func(taskContext, rdd.iterator(split, taskContext)) job.listener.taskSucceeded(0, result) @@ -531,9 +532,16 @@ class DAGScheduler( tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) } } + + val properties = if (idToActiveJob.contains(stage.jobId)) { + idToActiveJob(stage.jobId).properties + } else { + //this stage will be assigned to "default" pool + null + } + // must be run listener before possible NotSerializableException // should be "StageSubmitted" first and then "JobEnded" - val properties = idToActiveJob(stage.jobId).properties listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties)) if (tasks.size > 0) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 22e3723ac8e4916d04882b74dd4c5ae8f002aefb..446d490cc9dde851046d50d61279d5129dd9b818 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -20,10 +20,12 @@ package org.apache.spark.scheduler import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.SparkContext -private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source { +private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) + extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "DAGScheduler" + val sourceName = "%s.DAGScheduler".format(sc.appName) metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 2b007cbe824bc1eca40c3cdd3c9e054567d5f1ff..07e8317e3aed83a819f3ccf113e809847ede7185 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -77,7 +77,7 @@ private[spark] class ResultTask[T, U]( var func: (TaskContext, Iterator[T]) => U, var partition: Int, @transient locs: Seq[TaskLocation], - val outputId: Int) + var outputId: Int) extends Task[U](stageId) with Externalizable { def this() = this(0, null, null, 0, null, 0) @@ -93,7 +93,7 @@ private[spark] class ResultTask[T, U]( } override def run(attemptId: Long): U = { - val context = new TaskContext(stageId, partition, attemptId) + val context = new TaskContext(stageId, partition, attemptId, runningLocally = false) metrics = Some(context.taskMetrics) try { func(context, rdd.iterator(split, context)) @@ -130,7 +130,7 @@ private[spark] class ResultTask[T, U]( rdd = rdd_.asInstanceOf[RDD[T]] func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U] partition = in.readInt() - val outputId = in.readInt() + outputId = in.readInt() epoch = in.readLong() split = in.readObject().asInstanceOf[Partition] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 764775fedea2ecadd5995a45d0604e68d289c0fd..d23df0dd2b0f198680f7ef7c1857845c10c405b5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -132,7 +132,7 @@ private[spark] class ShuffleMapTask( override def run(attemptId: Long): MapStatus = { val numOutputSplits = dep.partitioner.numPartitions - val taskContext = new TaskContext(stageId, partition, attemptId) + val taskContext = new TaskContext(stageId, partition, attemptId, runningLocally = false) metrics = Some(taskContext.taskMetrics) val blockManager = SparkEnv.get.blockManager diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 3d709cfde49b36af35a6c1c51190566f635b71fe..acc3951088a8de54c9c28dbf0fd2b71287970342 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -20,11 +20,13 @@ package org.apache.spark.storage import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.SparkContext -private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source { +private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) + extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "BlockManager" + val sourceName = "%s.BlockManager".format(sc.appName) metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] { override def getValue: Long = { diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 755f1a760ee051dfd9c94ce6efb5d7765609060d..632ff047d10428dbcc1ab055e2d23bf1a5753b66 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -23,9 +23,9 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, * whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory * in a serialized format, and whether to replicate the RDD partitions on multiple nodes. - * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants for - * commonly useful storage levels. To create your own storage level object, use the factor method - * of the singleton object (`StorageLevel(...)`). + * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants + * for commonly useful storage levels. To create your own storage level object, use the + * factory method of the singleton object (`StorageLevel(...)`). */ class StorageLevel private( private var useDisk_ : Boolean, diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..3a7171c48822155be972d57ae8c1c9d0b3e2e7eb --- /dev/null +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.mock.EasyMockSugar + +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.{BlockManager, StorageLevel} + +// TODO: Test the CacheManager's thread-safety aspects +class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar { + var sc : SparkContext = _ + var blockManager: BlockManager = _ + var cacheManager: CacheManager = _ + var split: Partition = _ + /** An RDD which returns the values [1, 2, 3, 4]. */ + var rdd: RDD[Int] = _ + + before { + sc = new SparkContext("local", "test") + blockManager = mock[BlockManager] + cacheManager = new CacheManager(blockManager) + split = new Partition { override def index: Int = 0 } + rdd = new RDD[Int](sc, Nil) { + override def getPartitions: Array[Partition] = Array(split) + override val getDependencies = List[Dependency[_]]() + override def compute(split: Partition, context: TaskContext) = Array(1, 2, 3, 4).iterator + } + } + + after { + sc.stop() + } + + test("get uncached rdd") { + expecting { + blockManager.get("rdd_0_0").andReturn(None) + blockManager.put("rdd_0_0", ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY, true). + andReturn(0) + } + + whenExecuting(blockManager) { + val context = new TaskContext(0, 0, 0, runningLocally = false, null) + val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) + assert(value.toList === List(1, 2, 3, 4)) + } + } + + test("get cached rdd") { + expecting { + blockManager.get("rdd_0_0").andReturn(Some(ArrayBuffer(5, 6, 7).iterator)) + } + + whenExecuting(blockManager) { + val context = new TaskContext(0, 0, 0, runningLocally = false, null) + val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) + assert(value.toList === List(5, 6, 7)) + } + } + + test("get uncached local rdd") { + expecting { + // Local computation should not persist the resulting value, so don't expect a put(). + blockManager.get("rdd_0_0").andReturn(None) + } + + whenExecuting(blockManager) { + val context = new TaskContext(0, 0, 0, runningLocally = true, null) + val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY) + assert(value.toList === List(1, 2, 3, 4)) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 8a869c9005fb61280adfec2550bc3f12253e12b4..591c1d498dc0555d66ccded041b7fcc05503403a 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -495,7 +495,7 @@ public class JavaAPISuite implements Serializable { @Test public void iterator() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); - TaskContext context = new TaskContext(0, 0, 0, null); + TaskContext context = new TaskContext(0, 0, 0, false, null); Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue()); } diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..05f8545c7b82837695769383c5bcd1ceeb49b4e0 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.File +import java.util.Date + +import net.liftweb.json.{JsonAST, JsonParser} +import net.liftweb.json.JsonAST.JValue +import org.scalatest.FunSuite + +import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} +import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo} +import org.apache.spark.deploy.worker.ExecutorRunner + +class JsonProtocolSuite extends FunSuite { + test("writeApplicationInfo") { + val output = JsonProtocol.writeApplicationInfo(createAppInfo()) + assertValidJson(output) + } + + test("writeWorkerInfo") { + val output = JsonProtocol.writeWorkerInfo(createWorkerInfo()) + assertValidJson(output) + } + + test("writeApplicationDescription") { + val output = JsonProtocol.writeApplicationDescription(createAppDesc()) + assertValidJson(output) + } + + test("writeExecutorRunner") { + val output = JsonProtocol.writeExecutorRunner(createExecutorRunner()) + assertValidJson(output) + } + + test("writeMasterState") { + val workers = Array[WorkerInfo](createWorkerInfo(), createWorkerInfo()) + val activeApps = Array[ApplicationInfo](createAppInfo()) + val completedApps = Array[ApplicationInfo]() + val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps) + val output = JsonProtocol.writeMasterState(stateResponse) + assertValidJson(output) + } + + test("writeWorkerState") { + val executors = List[ExecutorRunner]() + val finishedExecutors = List[ExecutorRunner](createExecutorRunner(), createExecutorRunner()) + val stateResponse = new WorkerStateResponse("host", 8080, "workerId", executors, + finishedExecutors, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl") + val output = JsonProtocol.writeWorkerState(stateResponse) + assertValidJson(output) + } + + def createAppDesc() : ApplicationDescription = { + val cmd = new Command("mainClass", List("arg1", "arg2"), Map()) + new ApplicationDescription("name", 4, 1234, cmd, "sparkHome", "appUiUrl") + } + def createAppInfo() : ApplicationInfo = { + new ApplicationInfo(3, "id", createAppDesc(), new Date(123456789), null, "appUriStr") + } + def createWorkerInfo() : WorkerInfo = { + new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress") + } + def createExecutorRunner() : ExecutorRunner = { + new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", + new File("sparkHome"), new File("workDir")) + } + + def assertValidJson(json: JValue) { + try { + JsonParser.parse(JsonAST.compactRender(json)) + } catch { + case e: JsonParser.ParseException => fail("Invalid Json detected", e) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala index 58c94a162d4a4122fe0dcef9a0defedc773c5cda..1a9ce8c607dcd630c1aed109c20192c22d4be947 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala @@ -30,14 +30,13 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { val conf = new MetricsConfig(Option("dummy-file")) conf.initialize() - assert(conf.properties.size() === 5) + assert(conf.properties.size() === 4) assert(conf.properties.getProperty("test-for-dummy") === null) val property = conf.getInstance("random") - assert(property.size() === 3) + assert(property.size() === 2) assert(property.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") - assert(property.getProperty("sink.servlet.uri") === "/metrics/json") - assert(property.getProperty("sink.servlet.sample") === "false") + assert(property.getProperty("sink.servlet.path") === "/metrics/json") } test("MetricsConfig with properties set") { @@ -45,22 +44,20 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { conf.initialize() val masterProp = conf.getInstance("master") - assert(masterProp.size() === 6) + assert(masterProp.size() === 5) assert(masterProp.getProperty("sink.console.period") === "20") assert(masterProp.getProperty("sink.console.unit") === "minutes") assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource") assert(masterProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") - assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json") - assert(masterProp.getProperty("sink.servlet.sample") === "false") + assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json") val workerProp = conf.getInstance("worker") - assert(workerProp.size() === 6) + assert(workerProp.size() === 5) assert(workerProp.getProperty("sink.console.period") === "10") assert(workerProp.getProperty("sink.console.unit") === "seconds") assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource") assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") - assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json") - assert(workerProp.getProperty("sink.servlet.sample") === "false") + assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json") } test("MetricsConfig with subProperties") { @@ -84,6 +81,6 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { assert(consoleProps.size() === 2) val servletProps = sinkProps("servlet") - assert(servletProps.size() === 3) + assert(servletProps.size() === 2) } } diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index 84749fda4e6051921747bdbbf3ea086b14945960..90928c802104ef7136769932e6277a8b32cf50d9 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -97,7 +97,9 @@ <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a> <ul class="dropdown-menu"> <li><a href="configuration.html">Configuration</a></li> + <li><a href="monitoring.html">Monitoring</a></li> <li><a href="tuning.html">Tuning Guide</a></li> + <li><a href="hadoop-third-party-distributions.html">Running with CDH/HDP</a></li> <li><a href="hardware-provisioning.html">Hardware Provisioning</a></li> <li><a href="building-with-maven.html">Building Spark with Maven</a></li> <li><a href="contributing-to-spark.html">Contributing to Spark</a></li> diff --git a/docs/hadoop-third-party-distributions.md b/docs/hadoop-third-party-distributions.md new file mode 100644 index 0000000000000000000000000000000000000000..9f4f354525b7a876df18656bbb2318adad3a91e1 --- /dev/null +++ b/docs/hadoop-third-party-distributions.md @@ -0,0 +1,76 @@ +--- +layout: global +title: Running with Cloudera and HortonWorks Distributions +--- + +Spark can run against all versions of Cloudera's Distribution Including Hadoop (CDH) and +the Hortonworks Data Platform (HDP). There are a few things to keep in mind when using Spark with +these distributions: + +# Compile-time Hadoop Version +When compiling Spark, you'll need to +[set the HADOOP_VERSION flag](http://localhost:4000/index.html#a-note-about-hadoop-versions): + + HADOOP_VERSION=1.0.4 sbt/sbt assembly + +The table below lists the corresponding HADOOP_VERSION for each CDH/HDP release. Note that +some Hadoop releases are binary compatible across client versions. This means the pre-built Spark +distribution may "just work" without you needing to compile. That said, we recommend compiling with +the _exact_ Hadoop version you are running to avoid any compatibility errors. + +<table> + <tr valign="top"> + <td> + <h3>CDH Releases</h3> + <table class="table" style="width:350px;"> + <tr><th>Version</th><th>HADOOP_VERSION</th></tr> + <tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-chd4.X.X</td></tr> + <tr><td>CDH 4.X.X</td><td>2.0.0-mr1-chd4.X.X</td></tr> + <tr><td>CDH 3u6</td><td>0.20.2-cdh3u6</td></tr> + <tr><td>CDH 3u5</td><td>0.20.2-cdh3u5</td></tr> + <tr><td>CDH 3u4</td><td>0.20.2-cdh3u4</td></tr> + </table> + </td> + <td> + <h3>HDP Releases</h3> + <table class="table" style="width:350px;"> + <tr><th>Version</th><th>HADOOP_VERSION</th></tr> + <tr><td>HDP 1.3</td><td>1.2.0</td></tr> + <tr><td>HDP 1.2</td><td>1.1.2</td></tr> + <tr><td>HDP 1.1</td><td>1.0.3</td></tr> + <tr><td>HDP 1.0</td><td>1.0.3</td></tr> + </table> + </td> + </tr> +</table> + +# Where to Run Spark +As described in the [Hardware Provisioning](hardware-provisioning.html#storage-systems) guide, +Spark can run in a variety of deployment modes: + +* Using dedicated set of Spark nodes in your cluster. These nodes should be co-located with your + Hadoop installation. +* Running on the same nodes as an existing Hadoop installation, with a fixed amount memory and + cores dedicated to Spark on each node. +* Run Spark alongside Hadoop using a cluster resource manager, such as YARN or Mesos. + +These options are identical for those using CDH and HDP. + +# Inheriting Cluster Configuration +If you plan to read and write from HDFS using Spark, there are two Hadoop configuration files that +should be included on Spark's classpath: + +* `hdfs-site.xml`, which provides default behaviors for the HDFS client. +* `core-site.xml`, which sets the default filesystem name. + +The location of these configuration files varies across CDH and HDP versions, but +a common location is inside of `/etc/hadoop/conf`. Some tools, such as Cloudera Manager, create +configurations on-the-fly, but offer a mechanisms to download copies of them. + +There are a few ways to make these files visible to Spark: + +* You can copy these files into `$SPARK_HOME/conf` and they will be included in Spark's +classpath automatically. +* If you are running Spark on the same nodes as Hadoop _and_ your distribution includes both +`hdfs-site.xml` and `core-site.xml` in the same directory, you can set `HADOOP_CONF_DIR` +in `$SPARK_HOME/spark-env.sh` to that directory. diff --git a/docs/index.md b/docs/index.md index 7d739299409f6c06a828fc77faa76046086a4f6b..d3aacc629f257b3edd9aa4b19968049f97cf70f0 100644 --- a/docs/index.md +++ b/docs/index.md @@ -46,6 +46,11 @@ Spark supports several options for deployment: * [Apache Mesos](running-on-mesos.html) * [Hadoop YARN](running-on-yarn.html) +There is a script, `./make-distribution.sh`, which will create a binary distribution of Spark for deployment +to any machine with only the Java runtime as a necessary dependency. +Running the script creates a distribution directory in `dist/`, or the `-tgz` option to create a .tgz file. +Check the script for additional options. + # A Note About Hadoop Versions Spark uses the Hadoop-client library to talk to HDFS and other Hadoop-supported diff --git a/docs/monitoring.md b/docs/monitoring.md new file mode 100644 index 0000000000000000000000000000000000000000..4c4f174503dfbea26e2ac72430531630a2a9b297 --- /dev/null +++ b/docs/monitoring.md @@ -0,0 +1,58 @@ +--- +layout: global +title: Monitoring and Instrumentation +--- + +There are several ways to monitor the progress of Spark jobs. + +# Web Interfaces +When a SparkContext is initialized, it launches a web server (by default at port 3030) which +displays useful information. This includes a list of active and completed scheduler stages, +a summary of RDD blocks and partitions, and environmental information. If multiple SparkContexts +are running on the same host, they will bind to succesive ports beginning with 3030 (3031, 3032, +etc). + +Spark's Standlone Mode scheduler also has its own +[web interface](spark-standalone.html#monitoring-and-logging). + +# Spark Metrics +Spark has a configurable metrics system based on the +[Coda Hale Metrics Library](http://metrics.codahale.com/). +This allows users to report Spark metrics to a variety of sinks including HTTP, JMX, and CSV +files. The metrics system is configured via a configuration file that Spark expects to be present +at `$SPARK_HOME/conf/metrics.conf`. A custom file location can be specified via the +`spark.metrics.conf` Java system property. Spark's metrics are decoupled into different +_instances_ corresponding to Spark components. Within each instance, you can configure a +set of sinks to which metrics are reported. The following instances are currently supported: + +* `master`: The Spark standalone master process. +* `applications`: A component within the master which reports on various applications. +* `worker`: A Spark standalone worker process. +* `executor`: A Spark executor. +* `driver`: The Spark driver process (the process in which your SparkContext is created). + +Each instance can report to zero or more _sinks_. Sinks are contained in the +`org.apache.spark.metrics.sink` package: + +* `ConsoleSink`: Logs metrics information to the console. +* `CSVSink`: Exports metrics data to CSV files at regular intervals. +* `GangliaSink`: Sends metrics to a Ganglia node or multicast group. +* `JmxSink`: Registers metrics for viewing in a JXM console. +* `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data. + +The syntax of the metrics configuration file is defined in an example configuration file, +`$SPARK_HOME/conf/metrics.conf.template`. + +# Advanced Instrumentation +Several external tools can be used to help profile the performance of Spark jobs: + +* Cluster-wide monitoring tools, such as [Ganglia](http://ganglia.sourceforge.net/), can provide +insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia +dashboard can quickly reveal whether a particular workload is disk bound, network bound, or +CPU bound. +* OS profiling tools such as [dstat](http://dag.wieers.com/home-made/dstat/), +[iostat](http://linux.die.net/man/1/iostat), and [iotop](http://linux.die.net/man/1/iotop) +can provide fine-grained profiling on individual nodes. +* JVM utilities such as `jstack` for providing stack traces, `jmap` for creating heap-dumps, +`jstat` for reporting time-series statistics and `jconsole` for visually exploring various JVM +properties are useful for those comfortable with JVM internals. diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 93421efcbc30fce2fd3415d0f34c61379e42af47..c611db0af4ca0cf3ace5a1659ea449eb581777a6 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -42,7 +42,7 @@ This would be used to connect to the cluster, write to the dfs and submit jobs t The command to launch the YARN Client is as follows: - SPARK_JAR=<SPARK_YARN_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \ + SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \ --jar <YOUR_APP_JAR_FILE> \ --class <APP_MAIN_CLASS> \ --args <APP_MAIN_ARGUMENTS> \ @@ -54,14 +54,27 @@ The command to launch the YARN Client is as follows: For example: - SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./spark-class org.apache.spark.deploy.yarn.Client \ - --jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \ - --class org.apache.spark.examples.SparkPi \ - --args yarn-standalone \ - --num-workers 3 \ - --master-memory 4g \ - --worker-memory 2g \ - --worker-cores 1 + # Build the Spark assembly JAR and the Spark examples JAR + $ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true ./sbt/sbt assembly + + # Configure logging + $ cp conf/log4j.properties.template conf/log4j.properties + + # Submit Spark's ApplicationMaster to YARN's ResourceManager, and instruct Spark to run the SparkPi example + $ SPARK_JAR=./assembly/target/scala-{{site.SCALA_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \ + ./spark-class org.apache.spark.deploy.yarn.Client \ + --jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \ + --class org.apache.spark.examples.SparkPi \ + --args yarn-standalone \ + --num-workers 3 \ + --master-memory 4g \ + --worker-memory 2g \ + --worker-cores 1 + + # Examine the output (replace $YARN_APP_ID in the following with the "application identifier" output by the previous command) + # (Note: YARN_APP_LOGS_DIR is usually /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version.) + $ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout + Pi is roughly 3.13794 The above starts a YARN Client programs which periodically polls the Application Master for status updates and displays them in the console. The client will exit once your application has finished running. diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 994a96f2c99310c3d68dd239a03c9b6eeb070b4b..69e12915807e42051f474a5ae5701c388dc69a5f 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -3,13 +3,21 @@ layout: global title: Spark Standalone Mode --- -In addition to running on the Mesos or YARN cluster managers, Spark also provides a simple standalone deploy mode. You can launch a standalone cluster either manually, by starting a master and workers by hand, or use our provided [deploy scripts](#cluster-launch-scripts). It is also possible to run these daemons on a single machine for testing. +In addition to running on the Mesos or YARN cluster managers, Spark also provides a simple standalone deploy mode. You can launch a standalone cluster either manually, by starting a master and workers by hand, or use our provided [launch scripts](#cluster-launch-scripts). It is also possible to run these daemons on a single machine for testing. + +# Installing Spark Standalone to a Cluster + +The easiest way to deploy Spark is by running the `./make-distribution.sh` script to create a binary distribution. +This distribution can be deployed to any machine with the Java runtime installed; there is no need to install Scala. + +The recommended procedure is to deploy and start the master on one node first, get the master spark URL, +then modify `conf/spark-env.sh` in the `dist/` directory before deploying to all the other nodes. # Starting a Cluster Manually You can start a standalone master server by executing: - ./spark-class org.apache.spark.deploy.master.Master + ./bin/start-master.sh Once started, the master will print out a `spark://HOST:PORT` URL for itself, which you can use to connect workers to it, or pass as the "master" argument to `SparkContext`. You can also find this URL on @@ -22,7 +30,7 @@ Similarly, you can start one or more workers and connect them to the master via: Once you have started a worker, look at the master's web UI ([http://localhost:8080](http://localhost:8080) by default). You should see the new node listed there, along with its number of CPUs and memory (minus one gigabyte left for the OS). -Finally, the following configuration options can be passed to the master and worker: +Finally, the following configuration options can be passed to the master and worker: <table class="table"> <tr><th style="width:21%">Argument</th><th>Meaning</th></tr> @@ -55,7 +63,7 @@ Finally, the following configuration options can be passed to the master and wor # Cluster Launch Scripts -To launch a Spark standalone cluster with the deploy scripts, you need to create a file called `conf/slaves` in your Spark directory, which should contain the hostnames of all the machines where you would like to start Spark workers, one per line. The master machine must be able to access each of the slave machines via password-less `ssh` (using a private key). For testing, you can just put `localhost` in this file. +To launch a Spark standalone cluster with the launch scripts, you need to create a file called `conf/slaves` in your Spark directory, which should contain the hostnames of all the machines where you would like to start Spark workers, one per line. The master machine must be able to access each of the slave machines via password-less `ssh` (using a private key). For testing, you can just put `localhost` in this file. Once you've set up this file, you can launch or stop your cluster with the following shell scripts, based on Hadoop's deploy scripts, and available in `SPARK_HOME/bin`: @@ -134,6 +142,10 @@ To run an interactive Spark shell against the cluster, run the following command MASTER=spark://IP:PORT ./spark-shell +Note that if you are running spark-shell from one of the spark cluster machines, the `spark-shell` script will +automatically set MASTER from the `SPARK_MASTER_IP` and `SPARK_MASTER_PORT` variables in `conf/spark-env.sh`. + +You can also pass an option `-c <numCores>` to control the number of cores that spark-shell uses on the cluster. # Job Scheduling diff --git a/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh b/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh index 675429c57ed853063cb6f4f30992a437ed5c6598..42e8faa26ed090a7c23a2bec72b84c903def1e34 100644 --- a/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh +++ b/ec2/deploy.generic/root/spark-ec2/ec2-variables.sh @@ -1,5 +1,22 @@ #!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + # These variables are automatically filled in by the spark-ec2 script. export MASTERS="{{master_list}}" export SLAVES="{{slave_list}}" diff --git a/examples/pom.xml b/examples/pom.xml index 224cf6c96c9cc91b80a03963f777c246fae403f7..e48f5b50abcc6f7f88bf6bdb40e42be822767539 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -127,20 +127,6 @@ </dependency> </dependencies> - <profiles> - <profile> - <id>hadoop2-yarn</id> - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - </dependencies> - </profile> - </profiles> - <build> <outputDirectory>target/scala-${scala.version}/classes</outputDirectory> <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory> diff --git a/pom.xml b/pom.xml index c561b099ab76a8dd304a2f65bbb4382d5399d2c9..5c1f9f03246bc714e5e394c6bcc13a21eb9636e3 100644 --- a/pom.xml +++ b/pom.xml @@ -368,6 +368,99 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + <version>${hadoop.version}</version> + <exclusions> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-jaxrs</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-xc</artifactId> + </exclusion> + </exclusions> + </dependency> <!-- Specify Avro version because Kafka also has it as a dependency --> <dependency> <groupId>org.apache.avro</groupId> @@ -620,131 +713,6 @@ <dependencyManagement> <dependencies> - <!-- TODO: check versions, bringover from yarn branch ! --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-jaxrs</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-xc</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-jaxrs</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-xc</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-jaxrs</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-xc</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-client</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-jaxrs</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-xc</artifactId> - </exclusion> - </exclusions> - </dependency> </dependencies> </dependencyManagement> </profile> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index d038a4f479682a10e316bc5493f77a278ca4afbb..a60b553b5a9d933a3fe74f4ab8182c35907bdc8b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -33,15 +33,19 @@ object SparkBuild extends Build { // HBase version; set as appropriate. val HBASE_VERSION = "0.94.6" + // Target JVM version + val SCALAC_JVM_VERSION = "jvm-1.5" + val JAVAC_JVM_VERSION = "1.5" + lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects: _*) lazy val core = Project("core", file("core"), settings = coreSettings) lazy val repl = Project("repl", file("repl"), settings = replSettings) - .dependsOn(core, bagel, mllib) dependsOn(maybeYarn: _*) + .dependsOn(core, bagel, mllib) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) - .dependsOn(core, mllib, bagel, streaming) dependsOn(maybeYarn: _*) + .dependsOn(core, mllib, bagel, streaming) lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming) @@ -77,7 +81,9 @@ object SparkBuild extends Build { organization := "org.apache.spark", version := "0.8.0-SNAPSHOT", scalaVersion := "2.9.3", - scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"), + scalacOptions := Seq("-unchecked", "-optimize", "-deprecation", + "-target:" + SCALAC_JVM_VERSION), + javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", @@ -207,6 +213,7 @@ object SparkBuild extends Build { "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", + "com.codahale.metrics" % "metrics-ganglia" % "3.0.0", "com.twitter" % "chill_2.9.3" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1" ) diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala index 999611982aed7f3c578122dfc5d58bd82d00712a..6a66bd1d06b23ee3ef9e20394388c6b649bd5db1 100644 --- a/project/project/SparkPluginBuild.scala +++ b/project/project/SparkPluginBuild.scala @@ -1,7 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + import sbt._ object SparkPluginDef extends Build { lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener) /* This is not published in a Maven repository, so we get it from GitHub directly */ lazy val junitXmlListener = uri("git://github.com/ijuma/junit_xml_listener.git#fe434773255b451a38e8d889536ebc260f4225ce") -} \ No newline at end of file +} diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index fd5972d381a80c7a118ede5201c06eda513318b5..1f35f6f939d8e70c16ec3ebaff5d0b70df59c7bc 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -30,6 +30,8 @@ Public classes: An "add-only" shared variable that tasks can only add values to. - L{SparkFiles<pyspark.files.SparkFiles>} Access files shipped with jobs. + - L{StorageLevel<pyspark.storagelevel.StorageLevel>} + Finer-grained cache persistence levels. """ import sys import os @@ -39,6 +41,7 @@ sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.eg from pyspark.context import SparkContext from pyspark.rdd import RDD from pyspark.files import SparkFiles +from pyspark.storagelevel import StorageLevel -__all__ = ["SparkContext", "RDD", "SparkFiles"] +__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"] diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 8fbf2965097d925fb62d835e89945fa17bfeaa4f..597110321a86370c29063052eb892f9213a4bfb3 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -27,6 +27,7 @@ from pyspark.broadcast import Broadcast from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway from pyspark.serializers import dump_pickle, write_with_length, batched +from pyspark.storagelevel import StorageLevel from pyspark.rdd import RDD from py4j.java_collections import ListConverter @@ -279,6 +280,16 @@ class SparkContext(object): """ self._jsc.sc().setCheckpointDir(dirName, useExisting) + def _getJavaStorageLevel(self, storageLevel): + """ + Returns a Java StorageLevel based on a pyspark.StorageLevel. + """ + if not isinstance(storageLevel, StorageLevel): + raise Exception("storageLevel must be of type pyspark.StorageLevel") + + newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel + return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory, + storageLevel.deserialized, storageLevel.replication) def _test(): import atexit diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 914118ccdd92b7560514e13e118b18dbd947659c..58e1849cadac8611c0a4e85681cc67ea6bb7ca25 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -70,6 +70,25 @@ class RDD(object): self._jrdd.cache() return self + def persist(self, storageLevel): + """ + Set this RDD's storage level to persist its values across operations after the first time + it is computed. This can only be used to assign a new storage level if the RDD does not + have a storage level set yet. + """ + self.is_cached = True + javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) + self._jrdd.persist(javaStorageLevel) + return self + + def unpersist(self): + """ + Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. + """ + self.is_cached = False + self._jrdd.unpersist() + return self + def checkpoint(self): """ Mark this RDD for checkpointing. It will be saved to a file inside the diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 54823f80378787ff5c73c78f6b8ab8d336d52975..dc205b306f0a93d398d4b18c95da6f888a93ec6e 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -24,6 +24,7 @@ import os import platform import pyspark from pyspark.context import SparkContext +from pyspark.storagelevel import StorageLevel # this is the equivalent of ADD_JARS add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py new file mode 100644 index 0000000000000000000000000000000000000000..b31f4762e69bc42cad009183004c0349fcf5e798 --- /dev/null +++ b/python/pyspark/storagelevel.py @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = ["StorageLevel"] + +class StorageLevel: + """ + Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, + whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory + in a serialized format, and whether to replicate the RDD partitions on multiple nodes. + Also contains static constants for some commonly used storage levels, such as MEMORY_ONLY. + """ + + def __init__(self, useDisk, useMemory, deserialized, replication = 1): + self.useDisk = useDisk + self.useMemory = useMemory + self.deserialized = deserialized + self.replication = replication + +StorageLevel.DISK_ONLY = StorageLevel(True, False, False) +StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2) +StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True) +StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, True, 2) +StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False) +StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, 2) +StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, True) +StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, True, 2) +StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False) +StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, 2) diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index d61b36a61a6559a210a54270d1dee466bb77a5a4..3685561501d67c8c47f0109a1a66588ec8e3acf5 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -105,16 +105,6 @@ </build> <profiles> - <profile> - <id>hadoop2-yarn</id> - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - </profile> <profile> <id>deb</id> <build> diff --git a/repl/pom.xml b/repl/pom.xml index a1c87d7618249cc720a2ecb65aa75dc8ae9248e8..3123b37780dc509c8e302b7fbe41f79610127ad6 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -131,16 +131,4 @@ </plugin> </plugins> </build> - <profiles> - <profile> - <id>hadoop2-yarn</id> - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - </profile> - </profiles> </project> diff --git a/yarn/pom.xml b/yarn/pom.xml index 654b5bcd2dc99f5f60f9d76e389c36237fc08d45..27b2002095fa8c2fa394c6469ad330e5a4bd6a07 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -30,6 +30,34 @@ <name>Spark Project YARN Support</name> <url>http://spark.incubator.apache.org/</url> + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro-ipc</artifactId> + </dependency> + </dependencies> + <build> <outputDirectory>target/scala-${scala.version}/classes</outputDirectory> <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory> @@ -75,37 +103,4 @@ </plugin> </plugins> </build> - - <profiles> - <profile> - <id>hadoop2-yarn</id> - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-client</artifactId> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-ipc</artifactId> - </dependency> - </dependencies> - </profile> - </profiles> </project>