diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index 094a95d70ea8af6590a439a1d1a0e25cf3ac4d23..86e2061b9f0cb47c6bce071f87ece0806a8af62f 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -5,10 +5,13 @@ import java.io._ import scala.collection.mutable.Map class Accumulator[T] ( - @transient initialValue: T, param: AccumulatorParam[T]) extends Serializable -{ + @transient initialValue: T, + param: AccumulatorParam[T]) + extends Serializable { + val id = Accumulators.newId - @transient var value_ = initialValue // Current value on master + @transient + var value_ = initialValue // Current value on master val zero = param.zero(initialValue) // Zero value to be passed to workers var deserialized = false @@ -39,14 +42,16 @@ trait AccumulatorParam[T] extends Serializable { // TODO: The multi-thread support in accumulators is kind of lame; check // if there's a more intuitive way of doing it right -private object Accumulators -{ +private object Accumulators { // TODO: Use soft references? => need to make readObject work properly then val originals = Map[Long, Accumulator[_]]() val localAccums = Map[Thread, Map[Long, Accumulator[_]]]() var lastId: Long = 0 - def newId: Long = synchronized { lastId += 1; return lastId } + def newId: Long = synchronized { + lastId += 1 + return lastId + } def register(a: Accumulator[_], original: Boolean): Unit = synchronized { if (original) { @@ -65,8 +70,9 @@ private object Accumulators // Get the values of the local accumulators for the current thread (by ID) def values: Map[Long, Any] = synchronized { val ret = Map[Long, Any]() - for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) + for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) { ret(id) = accum.value + } return ret } diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index 36e70f74032d56bf8bf23318fde4173344def56b..a721a136b253cb333463ea5eb281526dc6220d96 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -1,7 +1,7 @@ package spark class Aggregator[K, V, C] ( - val createCombiner: V => C, - val mergeValue: (C, V) => C, - val mergeCombiners: (C, C) => C -) extends Serializable \ No newline at end of file + val createCombiner: V => C, + val mergeValue: (C, V) => C, + val mergeCombiners: (C, C) => C + ) extends Serializable \ No newline at end of file diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala index 6f2f92f6051f7701f621cee3419e1d853c44677c..06c17ce99c22e3411f3241cef347e00f107b0172 100644 --- a/core/src/main/scala/spark/BoundedMemoryCache.scala +++ b/core/src/main/scala/spark/BoundedMemoryCache.scala @@ -3,13 +3,11 @@ package spark import java.util.LinkedHashMap /** - * An implementation of Cache that estimates the sizes of its entries and - * attempts to limit its total memory usage to a fraction of the JVM heap. - * Objects' sizes are estimated using SizeEstimator, which has limitations; - * most notably, we will overestimate total memory used if some cache - * entries have pointers to a shared object. Nonetheless, this Cache should - * work well when most of the space is used by arrays of primitives or of - * simple classes. + * An implementation of Cache that estimates the sizes of its entries and attempts to limit its + * total memory usage to a fraction of the JVM heap. Objects' sizes are estimated using + * SizeEstimator, which has limitations; most notably, we will overestimate total memory used if + * some cache entries have pointers to a shared object. Nonetheless, this Cache should work well + * when most of the space is used by arrays of primitives or of simple classes. */ class BoundedMemoryCache extends Cache with Logging { private val maxBytes: Long = getMaxBytes() @@ -24,7 +22,11 @@ class BoundedMemoryCache extends Cache with Logging { override def get(key: Any): Any = { synchronized { val entry = map.get(key) - if (entry != null) entry.value else null + if (entry != null) { + entry.value + } else { + null + } } } @@ -51,8 +53,8 @@ class BoundedMemoryCache extends Cache with Logging { } /** - * Remove least recently used entries from the map until at least space - * bytes are free. Assumes that a lock is held on the BoundedMemoryCache. + * Remove least recently used entries from the map until at least space bytes are free. Assumes + * that a lock is held on the BoundedMemoryCache. */ private def ensureFreeSpace(space: Long) { logInfo("ensureFreeSpace(%d) called with curBytes=%d, maxBytes=%d".format( @@ -67,7 +69,6 @@ class BoundedMemoryCache extends Cache with Logging { } protected def dropEntry(key: Any, entry: Entry) { - logInfo("Dropping key %s of size %d to make space".format( - key, entry.size)) + logInfo("Dropping key %s of size %d to make space".format(key, entry.size)) } } diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala index 89befae1a4df6e8d582b73ebd518b3a379e23778..696fff4e5e712def8c425377eadf260e5a285853 100644 --- a/core/src/main/scala/spark/Cache.scala +++ b/core/src/main/scala/spark/Cache.scala @@ -2,22 +2,19 @@ package spark import java.util.concurrent.atomic.AtomicLong - /** - * An interface for caches in Spark, to allow for multiple implementations. - * Caches are used to store both partitions of cached RDDs and broadcast - * variables on Spark executors. + * An interface for caches in Spark, to allow for multiple implementations. Caches are used to store + * both partitions of cached RDDs and broadcast variables on Spark executors. * - * A single Cache instance gets created on each machine and is shared by all - * caches (i.e. both the RDD split cache and the broadcast variable cache), - * to enable global replacement policies. However, because these several - * independent modules all perform caching, it is important to give them - * separate key namespaces, so that an RDD and a broadcast variable (for - * example) do not use the same key. For this purpose, Cache has the - * notion of KeySpaces. Each client module must first ask for a KeySpace, - * and then call get() and put() on that space using its own keys. - * This abstract class handles the creation of key spaces, so that subclasses - * need only deal with keys that are unique across modules. + * A single Cache instance gets created on each machine and is shared by all caches (i.e. both the + * RDD split cache and the broadcast variable cache), to enable global replacement policies. + * However, because these several independent modules all perform caching, it is important to give + * them separate key namespaces, so that an RDD and a broadcast variable (for example) do not use + * the same key. For this purpose, Cache has the notion of KeySpaces. Each client module must first + * ask for a KeySpace, and then call get() and put() on that space using its own keys. + * + * This abstract class handles the creation of key spaces, so that subclasses need only deal with + * keys that are unique across modules. */ abstract class Cache { private val nextKeySpaceId = new AtomicLong(0) @@ -29,7 +26,6 @@ abstract class Cache { def put(key: Any, value: Any): Unit } - /** * A key namespace in a Cache. */ diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 223c5dc5f7d555efc9966190ea24b8dce88627e8..5e9a70cc7e03813cd31fd4d24043c77add1aceb7 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -96,15 +96,16 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { // Get a snapshot of the currently known locations def getLocationsSnapshot(): HashMap[Int, Array[List[String]]] = { (trackerActor !? GetCacheLocations) match { - case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]] - case _ => throw new SparkException( - "Internal error: CacheTrackerActor did not reply with a HashMap") + case h: HashMap[_, _] => + h.asInstanceOf[HashMap[Int, Array[List[String]]]] + + case _ => + throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap") } } // Gets or computes an RDD split - def getOrCompute[T](rdd: RDD[T], split: Split)(implicit m: ClassManifest[T]) - : Iterator[T] = { + def getOrCompute[T](rdd: RDD[T], split: Split)(implicit m: ClassManifest[T]): Iterator[T] = { val key = (rdd.id, split.index) logInfo("CachedRDD partition key is " + key) val cachedVal = cache.get(key) diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/CartesianRDD.scala index df822b3552a44a13600b5e8b33954948f47895b4..38afa59b29d1d379648774658f699e99c074a149 100644 --- a/core/src/main/scala/spark/CartesianRDD.scala +++ b/core/src/main/scala/spark/CartesianRDD.scala @@ -1,16 +1,20 @@ package spark -class CartesianSplit(idx: Int, val s1: Split, val s2: Split) -extends Split with Serializable { +class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable { override val index = idx } class CartesianRDD[T: ClassManifest, U:ClassManifest]( - sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U]) -extends RDD[Pair[T, U]](sc) with Serializable { + sc: SparkContext, + rdd1: RDD[T], + rdd2: RDD[U]) + extends RDD[Pair[T, U]](sc) + with Serializable { + val numSplitsInRdd2 = rdd2.splits.size - @transient val splits_ = { + @transient + val splits_ = { // create the cross product split val array = new Array[Split](rdd1.splits.size * rdd2.splits.size) for (s1 <- rdd1.splits; s2 <- rdd2.splits) { diff --git a/core/src/main/scala/spark/ClosureCleaner.scala b/core/src/main/scala/spark/ClosureCleaner.scala index 40bcbb5a9dee8d50a406e4be1e1b7b88edb2901d..699fdc29820ef21f61860c10544cd6841e7139e0 100644 --- a/core/src/main/scala/spark/ClosureCleaner.scala +++ b/core/src/main/scala/spark/ClosureCleaner.scala @@ -9,7 +9,6 @@ import org.objectweb.asm.{ClassReader, MethodVisitor, Type} import org.objectweb.asm.commons.EmptyVisitor import org.objectweb.asm.Opcodes._ - object ClosureCleaner extends Logging { // Get an ASM class reader for a given class from the JAR that loaded it private def getClassReader(cls: Class[_]): ClassReader = { @@ -70,10 +69,11 @@ object ClosureCleaner extends Logging { } private def createNullValue(cls: Class[_]): AnyRef = { - if (cls.isPrimitive) + if (cls.isPrimitive) { new java.lang.Byte(0: Byte) // Should be convertible to any primitive type - else + } else { null + } } def clean(func: AnyRef): Unit = { @@ -154,31 +154,32 @@ object ClosureCleaner extends Logging { } } - class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends EmptyVisitor { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { return new EmptyVisitor { - override def visitFieldInsn(op: Int, owner: String, name: String, - desc: String) { - if (op == GETFIELD) - for (cl <- output.keys if cl.getName == owner.replace('/', '.')) + override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) { + if (op == GETFIELD) { + for (cl <- output.keys if cl.getName == owner.replace('/', '.')) { output(cl) += name + } + } } override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) { // Check for calls a getter method for a variable in an interpreter wrapper object. // This means that the corresponding field will be accessed, so we should save it. - if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) - for (cl <- output.keys if cl.getName == owner.replace('/', '.')) + if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) { + for (cl <- output.keys if cl.getName == owner.replace('/', '.')) { output(cl) += name + } + } } } } } - class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisitor { var myName: String = null @@ -196,8 +197,10 @@ class InnerClosureFinder(output: Set[Class[_]]) extends EmptyVisitor { if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0 && argTypes(0).toString.startsWith("L") // is it an object? && argTypes(0).getInternalName == myName) - output += Class.forName(owner.replace('/', '.'), false, - Thread.currentThread.getContextClassLoader) + output += Class.forName( + owner.replace('/', '.'), + false, + Thread.currentThread.getContextClassLoader) } } } diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala index 4a8fa6d3fc6e23dd30b62e24bc0325c7ee23c529..ed51f5ae47046ea7672f5c47eacf090d3adf84f3 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/CoGroupedRDD.scala @@ -10,20 +10,20 @@ sealed trait CoGroupSplitDep extends Serializable case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep -class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) -extends Split with Serializable { +class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable { override val index = idx override def hashCode(): Int = idx } class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] ( - { x => ArrayBuffer(x) }, - { (b, x) => b += x }, - { (b1, b2) => b1 ++ b2 } -) with Serializable + { x => ArrayBuffer(x) }, + { (b, x) => b += x }, + { (b1, b2) => b1 ++ b2 } + ) with Serializable class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner) -extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging { + extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging { + val aggr = new CoGroupAggregator override val dependencies = { @@ -41,7 +41,8 @@ extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging { deps.toList } - @transient val splits_ : Array[Split] = { + @transient + val splits_ : Array[Split] = { val firstRdd = rdds.head val array = new Array[Split](part.numPartitions) for (i <- 0 until array.size) { diff --git a/core/src/main/scala/spark/DAGScheduler.scala b/core/src/main/scala/spark/DAGScheduler.scala index 8ed5d7ba7193a7a667fb6b5b9b698f4eda467c52..fc9e6581bf537b822364a7e155266d49ceca26f8 100644 --- a/core/src/main/scala/spark/DAGScheduler.scala +++ b/core/src/main/scala/spark/DAGScheduler.scala @@ -4,18 +4,28 @@ import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} -// A task created by the DAG scheduler. Knows its stage ID and map ouput tracker generation. +/** + * A task created by the DAG scheduler. Knows its stage ID and map ouput tracker generation. + */ abstract class DAGTask[T](val stageId: Int) extends Task[T] { val gen = SparkEnv.get.mapOutputTracker.getGeneration override def generation: Option[Long] = Some(gen) } -// A completion event passed by the underlying task scheduler to the DAG scheduler -case class CompletionEvent(task: DAGTask[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any]) +/** + * A completion event passed by the underlying task scheduler to the DAG scheduler + */ +case class CompletionEvent( + task: DAGTask[_], + reason: TaskEndReason, + result: Any, + accumUpdates: Map[Long, Any]) -// Various possible reasons why a DAG task ended. The underlying scheduler is supposed -// to retry tasks several times for "ephemeral" failures, and only report back failures -// that require some old stages to be resubmitted, such as shuffle map fetch failures. +/** + * Various possible reasons why a DAG task ended. The underlying scheduler is supposed to retry + * tasks several times for "ephemeral" failures, and only report back failures that require some + * old stages to be resubmitted, such as shuffle map fetch failures. + */ sealed trait TaskEndReason case object Success extends TaskEndReason case class FetchFailed(serverUri: String, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason @@ -23,11 +33,10 @@ case class ExceptionFailure(exception: Throwable) extends TaskEndReason case class OtherFailure(message: String) extends TaskEndReason /** - * A Scheduler subclass that implements stage-oriented scheduling. It computes - * a DAG of stages for each job, keeps track of which RDDs and stage outputs - * are materialized, and computes a minimal schedule to run the job. Subclasses - * only need to implement the code to send a task to the cluster and to report - * fetch failures (the submitTasks method, and code to add CompletionEvents). + * A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for + * each job, keeps track of which RDDs and stage outputs are materialized, and computes a minimal + * schedule to run the job. Subclasses only need to implement the code to send a task to the cluster + * and to report fetch failures (the submitTasks method, and code to add CompletionEvents). */ private trait DAGScheduler extends Scheduler with Logging { // Must be implemented by subclasses to start running a set of tasks @@ -39,16 +48,15 @@ private trait DAGScheduler extends Scheduler with Logging { completionEvents.put(CompletionEvent(dagTask, reason, result, accumUpdates)) } - // The time, in millis, to wait for fetch failure events to stop coming in after - // one is detected; this is a simplistic way to avoid resubmitting tasks in the - // non-fetchable map stage one by one as more failure events come in + // The time, in millis, to wait for fetch failure events to stop coming in after one is detected; + // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one + // as more failure events come in val RESUBMIT_TIMEOUT = 2000L - // The time, in millis, to wake up between polls of the completion queue - // in order to potentially resubmit failed stages + // The time, in millis, to wake up between polls of the completion queue in order to potentially + // resubmit failed stages val POLL_TIMEOUT = 500L - private val completionEvents = new LinkedBlockingQueue[CompletionEvent] var nextStageId = 0 @@ -111,10 +119,8 @@ private trait DAGScheduler extends Scheduler with Logging { cacheTracker.registerRDD(r.id, r.splits.size) for (dep <- r.dependencies) { dep match { - case shufDep: ShuffleDependency[_,_,_] => - parents += getShuffleMapStage(shufDep) - case _ => - visit(dep.rdd) + case shufDep: ShuffleDependency[_,_,_] => parents += getShuffleMapStage(shufDep) + case _ => visit(dep.rdd) } } } @@ -136,8 +142,9 @@ private trait DAGScheduler extends Scheduler with Logging { dep match { case shufDep: ShuffleDependency[_,_,_] => val stage = getShuffleMapStage(shufDep) - if (!stage.isAvailable) + if (!stage.isAvailable) { missing += stage + } case narrowDep: NarrowDependency[_] => visit(narrowDep.rdd) } @@ -150,10 +157,13 @@ private trait DAGScheduler extends Scheduler with Logging { missing.toList } - override def runJob[T, U](finalRdd: RDD[T], func: (TaskContext, Iterator[T]) => U, - partitions: Seq[Int], allowLocal: Boolean) - (implicit m: ClassManifest[U]) - : Array[U] = synchronized { + override def runJob[T, U]( + finalRdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], + allowLocal: Boolean) + (implicit m: ClassManifest[U]): Array[U] = + synchronized { val outputParts = partitions.toArray val numOutputParts: Int = partitions.size val finalStage = newStage(finalRdd, None) @@ -192,8 +202,9 @@ private trait DAGScheduler extends Scheduler with Logging { submitMissingTasks(stage) running += stage } else { - for (parent <- missing) + for (parent <- missing) { submitStage(parent) + } waiting += stage } } diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index bd20634fb9ac4ff583a4820d51b82d31e45af3f5..d93c84924a5038fb202157b907092591b1343ac8 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -2,28 +2,29 @@ package spark abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) extends Serializable -abstract class NarrowDependency[T](rdd: RDD[T]) -extends Dependency(rdd, false) { +abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd, false) { def getParents(outputPartition: Int): Seq[Int] } class ShuffleDependency[K, V, C]( - val shuffleId: Int, - rdd: RDD[(K, V)], - val aggregator: Aggregator[K, V, C], - val partitioner: Partitioner -) extends Dependency(rdd, true) + val shuffleId: Int, + rdd: RDD[(K, V)], + val aggregator: Aggregator[K, V, C], + val partitioner: Partitioner) + extends Dependency(rdd, true) class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int) = List(partitionId) } class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) -extends NarrowDependency[T](rdd) { + extends NarrowDependency[T](rdd) { + override def getParents(partitionId: Int) = { - if (partitionId >= outStart && partitionId < outStart + length) + if (partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) - else + } else { Nil + } } } diff --git a/core/src/main/scala/spark/DiskSpillingCache.scala b/core/src/main/scala/spark/DiskSpillingCache.scala index 80e13a25196b79d2695d06688b7a3da1c345d080..157e071c7f91f146aeaeeb043fdf314105c7c970 100644 --- a/core/src/main/scala/spark/DiskSpillingCache.scala +++ b/core/src/main/scala/spark/DiskSpillingCache.scala @@ -8,7 +8,6 @@ import java.util.UUID // TODO: cache into a separate directory using Utils.createTempDir // TODO: clean up disk cache afterwards - class DiskSpillingCache extends BoundedMemoryCache { private val diskMap = new LinkedHashMap[Any, File](32, 0.75f, true) diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index 15693fc95f0abe4ef2af2b50a50ac7fd7d7f60f1..71a2ded7e7ffcdff37b55c18ff07a47e053596cd 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -27,8 +27,9 @@ class Executor extends org.apache.mesos.Executor with Logging { override def init(d: ExecutorDriver, args: ExecutorArgs) { // Read spark.* system properties from executor arg val props = Utils.deserialize[Array[(String, String)]](args.getData.toByteArray) - for ((key, value) <- props) + for ((key, value) <- props) { System.setProperty(key, value) + } // Make sure an appropriate class loader is set for remote actors RemoteActor.classLoader = getClass.getClassLoader @@ -45,7 +46,7 @@ class Executor extends org.apache.mesos.Executor with Logging { // Start worker thread pool threadPool = new ThreadPoolExecutor( - 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) + 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) } override def launchTask(d: ExecutorDriver, task: TaskDescription) { @@ -58,41 +59,42 @@ class Executor extends org.apache.mesos.Executor with Logging { val tid = desc.getTaskId.getValue logInfo("Running task ID " + tid) d.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(desc.getTaskId) - .setState(TaskState.TASK_RUNNING) - .build()) + .setTaskId(desc.getTaskId) + .setState(TaskState.TASK_RUNNING) + .build()) try { SparkEnv.set(env) Thread.currentThread.setContextClassLoader(classLoader) Accumulators.clear val task = Utils.deserialize[Task[Any]](desc.getData.toByteArray, classLoader) - for (gen <- task.generation) // Update generation if any is set + for (gen <- task.generation) {// Update generation if any is set env.mapOutputTracker.updateGeneration(gen) + } val value = task.run(tid.toInt) val accumUpdates = Accumulators.values val result = new TaskResult(value, accumUpdates) d.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(desc.getTaskId) - .setState(TaskState.TASK_FINISHED) - .setData(ByteString.copyFrom(Utils.serialize(result))) - .build()) + .setTaskId(desc.getTaskId) + .setState(TaskState.TASK_FINISHED) + .setData(ByteString.copyFrom(Utils.serialize(result))) + .build()) logInfo("Finished task ID " + tid) } catch { case ffe: FetchFailedException => { val reason = ffe.toTaskEndReason d.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(desc.getTaskId) - .setState(TaskState.TASK_FAILED) - .setData(ByteString.copyFrom(Utils.serialize(reason))) - .build()) + .setTaskId(desc.getTaskId) + .setState(TaskState.TASK_FAILED) + .setData(ByteString.copyFrom(Utils.serialize(reason))) + .build()) } case t: Throwable => { val reason = ExceptionFailure(t) d.sendStatusUpdate(TaskStatus.newBuilder() - .setTaskId(desc.getTaskId) - .setState(TaskState.TASK_FAILED) - .setData(ByteString.copyFrom(Utils.serialize(reason))) - .build()) + .setTaskId(desc.getTaskId) + .setState(TaskState.TASK_FAILED) + .setData(ByteString.copyFrom(Utils.serialize(reason))) + .build()) // TODO: Handle errors in tasks less dramatically logError("Exception in task ID " + tid, t) @@ -102,8 +104,10 @@ class Executor extends org.apache.mesos.Executor with Logging { } } - // Create a ClassLoader for use in tasks, adding any JARs specified by the - // user or any classes created by the interpreter to the search path + /** + * Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes + * created by the interpreter to the search path + */ private def createClassLoader(): ClassLoader = { var loader = this.getClass.getClassLoader diff --git a/core/src/main/scala/spark/FetchFailedException.scala b/core/src/main/scala/spark/FetchFailedException.scala index 6b6aea1ae42bda957123eba3dfe2b2261293ea04..db711e099c355d641417dc35eb1241096a021b76 100644 --- a/core/src/main/scala/spark/FetchFailedException.scala +++ b/core/src/main/scala/spark/FetchFailedException.scala @@ -1,13 +1,17 @@ package spark -class FetchFailedException(val serverUri: String, val shuffleId: Int, - val mapId: Int, val reduceId: Int, cause: Throwable) -extends Exception { +class FetchFailedException( + val serverUri: String, + val shuffleId: Int, + val mapId: Int, + val reduceId: Int, + cause: Throwable) + extends Exception { + override def getMessage(): String = "Fetch failed: %s %d %d %d".format(serverUri, shuffleId, mapId, reduceId) override def getCause(): Throwable = cause - def toTaskEndReason: TaskEndReason = - FetchFailed(serverUri, shuffleId, mapId, reduceId) + def toTaskEndReason: TaskEndReason = FetchFailed(serverUri, shuffleId, mapId, reduceId) } diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala index 47286e0a65d5c6c9a54ae816871d7f7dc33eb110..41608e5a4e98f4576fcae31f4384062fb267fa16 100644 --- a/core/src/main/scala/spark/HadoopRDD.scala +++ b/core/src/main/scala/spark/HadoopRDD.scala @@ -12,9 +12,15 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils -/** A Spark split class that wraps around a Hadoop InputSplit */ -class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit) -extends Split with Serializable { +/** + * A Spark split class that wraps around a Hadoop InputSplit. + */ +class HadoopSplit( + rddId: Int, + idx: Int, + @transient s: InputSplit) + extends Split with Serializable { + val inputSplit = new SerializableWritable[InputSplit](s) override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt @@ -22,42 +28,47 @@ extends Split with Serializable { override val index = idx } - /** - * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in - * HDFS, the local file system, or S3, tables in HBase, etc). + * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file + * system, or S3, tables in HBase, etc). */ class HadoopRDD[K, V]( - sc: SparkContext, - @transient conf: JobConf, - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int) -extends RDD[(K, V)](sc) { + sc: SparkContext, + @transient conf: JobConf, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int) + extends RDD[(K, V)](sc) { + val serializableConf = new SerializableWritable(conf) - @transient val splits_ : Array[Split] = { + @transient + val splits_ : Array[Split] = { val inputFormat = createInputFormat(conf) val inputSplits = inputFormat.getSplits(conf, minSplits) - val array = new Array[Split] (inputSplits.size) - for (i <- 0 until inputSplits.size) + val array = new Array[Split](inputSplits.size) + for (i <- 0 until inputSplits.size) { array(i) = new HadoopSplit(id, i, inputSplits(i)) + } array } def createInputFormat(conf: JobConf): InputFormat[K, V] = { ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) - .asInstanceOf[InputFormat[K, V]] + .asInstanceOf[InputFormat[K, V]] } - // Helper method for creating a Hadoop Writable, because the commonly used - // NullWritable class has no constructor + /** + * Helper method for creating a Hadoop Writable, because the commonly used NullWritable class has + * no constructor. + */ def createWritable[T](clazz: Class[T]): T = { - if (clazz == classOf[NullWritable]) + if (clazz == classOf[NullWritable]) { NullWritable.get().asInstanceOf[T] - else + } else { clazz.newInstance() + } } override def splits = splits_ @@ -80,8 +91,7 @@ extends RDD[(K, V)](sc) { try { finished = !reader.next(key, value) } catch { - case eofe: java.io.EOFException => - finished = true + case eofe: java.io.EOFException => finished = true } gotNext = true } diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index 73c8876eb662d442b76844d134675708d8974bee..84b37218b52f5078100489c386d9b4ca1bf3e08e 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -16,11 +16,12 @@ import spark.SerializableWritable import spark.Logging /** - * Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should - * also contain an output key class, an output value class, a filename to write to, etc - * exactly like in a Hadoop job. + * Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should also + * contain an output key class, an output value class, a filename to write to, etc exactly like in + * a Hadoop job. */ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable { + private val now = new Date() private val conf = new SerializableWritable(jobConf) @@ -58,22 +59,25 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl val outputName = "part-" + numfmt.format(splitID) val path = FileOutputFormat.getOutputPath(conf.value) val fs: FileSystem = { - if (path != null) + if (path != null) { path.getFileSystem(conf.value) - else + } else { FileSystem.get(conf.value) + } } getOutputCommitter().setupTask(getTaskContext()) - writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL) + writer = getOutputFormat().getRecordWriter( + fs, conf.value, outputName, Reporter.NULL) } def write(key: AnyRef, value: AnyRef) { if (writer!=null) { //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")") writer.write(key, value) - } else + } else { throw new IOException("Writer is null, open() has not been called") + } } def close() { @@ -109,26 +113,31 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl // ********* Private Functions ********* private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = { - if (format == null) - format = conf.value.getOutputFormat().asInstanceOf[OutputFormat[AnyRef,AnyRef]] + if (format == null) { + format = conf.value.getOutputFormat() + .asInstanceOf[OutputFormat[AnyRef,AnyRef]] + } return format } private def getOutputCommitter(): OutputCommitter = { - if (committer == null) + if (committer == null) { committer = conf.value.getOutputCommitter().asInstanceOf[OutputCommitter] + } return committer } private def getJobContext(): JobContext = { - if (jobContext == null) - jobContext = new JobContext(conf.value, jID.value) + if (jobContext == null) { + jobContext = new JobContext(conf.value, jID.value) + } return jobContext } private def getTaskContext(): TaskAttemptContext = { - if (taskContext == null) + if (taskContext == null) { taskContext = new TaskAttemptContext(conf.value, taID.value) + } return taskContext } @@ -138,7 +147,8 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl attemptID = attemptid jID = new SerializableWritable[JobID](HadoopWriter.createJobID(now, jobid)) - taID = new SerializableWritable[TaskAttemptID] (new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) + taID = new SerializableWritable[TaskAttemptID]( + new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) } private def setConfParams() { @@ -158,12 +168,14 @@ object HadoopWriter { } def createPathFromString(path: String, conf: JobConf): Path = { - if (path == null) + if (path == null) { throw new IllegalArgumentException("Output path is null") + } var outputPath = new Path(path) val fs = outputPath.getFileSystem(conf) - if (outputPath == null || fs == null) + if (outputPath == null || fs == null) { throw new IllegalArgumentException("Incorrectly formatted output path") + } outputPath = outputPath.makeQualified(fs) return outputPath } diff --git a/core/src/main/scala/spark/HttpServer.scala b/core/src/main/scala/spark/HttpServer.scala index d2a663ac1f97b8fb557e2214596bf23f47bdf47e..855f2c752faa19b42f7f3c3f1c0b9a79ad04a52c 100644 --- a/core/src/main/scala/spark/HttpServer.scala +++ b/core/src/main/scala/spark/HttpServer.scala @@ -9,18 +9,15 @@ import org.eclipse.jetty.server.handler.HandlerList import org.eclipse.jetty.server.handler.ResourceHandler import org.eclipse.jetty.util.thread.QueuedThreadPool - /** - * Exception type thrown by HttpServer when it is in the wrong state - * for an operation. + * Exception type thrown by HttpServer when it is in the wrong state for an operation. */ class ServerStateException(message: String) extends Exception(message) - /** - * An HTTP server for static content used to allow worker nodes to access JARs - * added to SparkContext as well as classes created by the interpreter when - * the user types in code. This is just a wrapper around a Jetty server. + * An HTTP server for static content used to allow worker nodes to access JARs added to SparkContext + * as well as classes created by the interpreter when the user types in code. This is just a wrapper + * around a Jetty server. */ class HttpServer(resourceBase: File) extends Logging { private var server: Server = null diff --git a/core/src/main/scala/spark/Job.scala b/core/src/main/scala/spark/Job.scala index 2200fb0c5d46aa800f9bb76df85b1e99fd0196d9..9846e918738d1d3ee1b395f0f00f23bbd602b107 100644 --- a/core/src/main/scala/spark/Job.scala +++ b/core/src/main/scala/spark/Job.scala @@ -4,8 +4,8 @@ import org.apache.mesos._ import org.apache.mesos.Protos._ /** - * Class representing a parallel job in MesosScheduler. Schedules the - * job by implementing various callbacks. + * Class representing a parallel job in MesosScheduler. Schedules the job by implementing various + * callbacks. */ abstract class Job(jobId: Int) { def slaveOffer(s: Offer, availableCpus: Double): Option[TaskDescription] diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala index 34f06b747d539167987d7b0fe72ea6201a1ce3fb..c4919c6516be090ada544612ee1b026acbd1a5f8 100644 --- a/core/src/main/scala/spark/LocalScheduler.scala +++ b/core/src/main/scala/spark/LocalScheduler.scala @@ -4,9 +4,9 @@ import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicInteger /** - * A simple Scheduler implementation that runs tasks locally in a thread pool. - * Optionally the scheduler also allows each task to fail up to maxFailures times, - * which is useful for testing fault recovery. + * A simple Scheduler implementation that runs tasks locally in a thread pool. Optionally the + * scheduler also allows each task to fail up to maxFailures times, which is useful for testing + * fault recovery. */ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGScheduler with Logging { var attemptId = new AtomicInteger(0) @@ -33,14 +33,13 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule // Set the Spark execution environment for the worker thread SparkEnv.set(env) try { - // Serialize and deserialize the task so that accumulators are - // changed to thread-local ones; this adds a bit of unnecessary - // overhead but matches how the Mesos Executor works + // Serialize and deserialize the task so that accumulators are changed to thread-local ones; + // this adds a bit of unnecessary overhead but matches how the Mesos Executor works. Accumulators.clear val bytes = Utils.serialize(task) logInfo("Size of task " + idInJob + " is " + bytes.size + " bytes") val deserializedTask = Utils.deserialize[Task[_]]( - bytes, Thread.currentThread.getContextClassLoader) + bytes, Thread.currentThread.getContextClassLoader) val result: Any = deserializedTask.run(attemptId) val accumUpdates = Accumulators.values logInfo("Finished task " + idInJob) diff --git a/core/src/main/scala/spark/Logging.scala b/core/src/main/scala/spark/Logging.scala index c9408bbcb63694be883813c36a031185a09bfea5..0d11ab9cbd836a5495f5392b942cb39ffd60e385 100644 --- a/core/src/main/scala/spark/Logging.scala +++ b/core/src/main/scala/spark/Logging.scala @@ -4,22 +4,24 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory /** - * Utility trait for classes that want to log data. Creates a SLF4J logger - * for the class and allows logging messages at different levels using - * methods that only evaluate parameters lazily if the log level is enabled. + * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows + * logging messages at different levels using methods that only evaluate parameters lazily if the + * log level is enabled. */ trait Logging { // Make the log field transient so that objects with Logging can // be serialized and used on another machine - @transient private var log_ : Logger = null + @transient + private var log_ : Logger = null // Method to get or create the logger for this object def log: Logger = { if (log_ == null) { var className = this.getClass().getName() // Ignore trailing $'s in the class names for Scala objects - if (className.endsWith("$")) + if (className.endsWith("$")) { className = className.substring(0, className.length - 1) + } log_ = LoggerFactory.getLogger(className) } return log_ diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index a183bf80faf457d728bef95f689bce001412cdbe..a934c5a02fe30706ddb9d6ce7194743c91c40ca1 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -24,6 +24,7 @@ extends DaemonActor with Logging { case GetMapOutputLocations(shuffleId: Int) => logInfo("Asked to get map output locations for shuffle " + shuffleId) reply(serverUris.get(shuffleId)) + case StopMapOutputTracker => reply('OK) exit() @@ -74,8 +75,9 @@ class MapOutputTracker(isMaster: Boolean) extends Logging { var array = serverUris.get(shuffleId) if (array != null) { array.synchronized { - if (array(mapId) == serverUri) + if (array(mapId) == serverUri) { array(mapId) = null + } } incrementGeneration() } else { @@ -95,7 +97,11 @@ class MapOutputTracker(isMaster: Boolean) extends Logging { if (fetching.contains(shuffleId)) { // Someone else is fetching it; wait for them to be done while (fetching.contains(shuffleId)) { - try {fetching.wait()} catch {case _ =>} + try { + fetching.wait() + } catch { + case _ => + } } return serverUris.get(shuffleId) } else { diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index c9c2f169f6f2f0a86e454195357b05d926aa35bc..618ee724f9beeaa5bd0fbab09750ebd766bee801 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -19,13 +19,17 @@ import org.apache.mesos._ import org.apache.mesos.Protos._ /** - * The main Scheduler implementation, which runs jobs on Mesos. Clients should - * first call start(), then submit tasks through the runTasks method. + * The main Scheduler implementation, which runs jobs on Mesos. Clients should first call start(), + * then submit tasks through the runTasks method. */ private class MesosScheduler( - sc: SparkContext, master: String, frameworkName: String) -extends MScheduler with DAGScheduler with Logging -{ + sc: SparkContext, + master: String, + frameworkName: String) + extends MScheduler + with DAGScheduler + with Logging { + // Environment variables to pass to our executors val ENV_VARS_TO_SEND_TO_EXECUTORS = Array( "SPARK_MEM", @@ -36,14 +40,15 @@ extends MScheduler with DAGScheduler with Logging // Memory used by each executor (in megabytes) val EXECUTOR_MEMORY = { - if (System.getenv("SPARK_MEM") != null) + if (System.getenv("SPARK_MEM") != null) { memoryStringToMb(System.getenv("SPARK_MEM")) // TODO: Might need to add some extra memory for the non-heap parts of the JVM - else + } else { 512 + } } - // Lock used to wait for scheduler to be registered + // Lock used to wait for scheduler to be registered private var isRegistered = false private val registeredLock = new Object() @@ -97,8 +102,7 @@ extends MScheduler with DAGScheduler with Logging val ret = driver.run() logInfo("driver.run() returned with code " + ret) } catch { - case e: Exception => - logError("driver.run() failed", e) + case e: Exception => logError("driver.run() failed", e) } } }.start @@ -108,26 +112,24 @@ extends MScheduler with DAGScheduler with Logging val sparkHome = sc.getSparkHome match { case Some(path) => path case None => - throw new SparkException("Spark home is not set; set it through the " + - "spark.home system property, the SPARK_HOME environment variable " + - "or the SparkContext constructor") + throw new SparkException("Spark home is not set; set it through the spark.home system " + + "property, the SPARK_HOME environment variable or the SparkContext constructor") } val execScript = new File(sparkHome, "spark-executor").getCanonicalPath val params = Params.newBuilder() for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) { if (System.getenv(key) != null) { params.addParam(Param.newBuilder() - .setKey("env." + key) - .setValue(System.getenv(key)) - .build()) + .setKey("env." + key) + .setValue(System.getenv(key)) + .build()) } } val memory = Resource.newBuilder() - .setName("mem") - .setType(Resource.Type.SCALAR) - .setScalar(Resource.Scalar.newBuilder() - .setValue(EXECUTOR_MEMORY).build()) - .build() + .setName("mem") + .setType(Resource.Type.SCALAR) + .setScalar(Resource.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build()) + .build() ExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue("default").build()) .setUri(execScript) @@ -172,15 +174,16 @@ extends MScheduler with DAGScheduler with Logging override def waitForRegister() { registeredLock.synchronized { - while (!isRegistered) + while (!isRegistered) { registeredLock.wait() + } } } /** - * Method called by Mesos to offer resources on slaves. We resond by asking - * our active jobs for tasks in FIFO order. We fill each node with tasks in - * a round-robin manner so that tasks are balanced across the cluster. + * Method called by Mesos to offer resources on slaves. We resond by asking our active jobs for + * tasks in FIFO order. We fill each node with tasks in a round-robin manner so that tasks are + * balanced across the cluster. */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { synchronized { @@ -197,7 +200,7 @@ extends MScheduler with DAGScheduler with Logging launchedTask = false for (i <- 0 until offers.size if enoughMem(i)) { job.slaveOffer(offers(i), availableCpus(i)) match { - case Some(task) => + case Some(task) => tasks(i).add(task) val tid = task.getTaskId.getValue val sid = offers(i).getSlaveId.getValue @@ -207,6 +210,7 @@ extends MScheduler with DAGScheduler with Logging slavesWithExecutors += sid availableCpus(i) -= getResource(task.getResourcesList(), "cpus") launchedTask = true + case None => {} } } @@ -221,8 +225,10 @@ extends MScheduler with DAGScheduler with Logging // Helper function to pull out a resource from a Mesos Resources protobuf def getResource(res: JList[Resource], name: String): Double = { - for (r <- res if r.getName == name) + for (r <- res if r.getName == name) { return r.getScalar.getValue + } + throw new IllegalArgumentException("No resource called " + name + " in " + res) } @@ -238,7 +244,8 @@ extends MScheduler with DAGScheduler with Logging synchronized { try { val tid = status.getTaskId.getValue - if (status.getState == TaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { + if (status.getState == TaskState.TASK_LOST + && taskIdToSlaveId.contains(tid)) { // We lost the executor on this slave, so remember that it's gone slavesWithExecutors -= taskIdToSlaveId(tid) } @@ -249,8 +256,9 @@ extends MScheduler with DAGScheduler with Logging } if (isFinished(status.getState)) { taskIdToJobId.remove(tid) - if (jobTasks.contains(jobId)) + if (jobTasks.contains(jobId)) { jobTasks(jobId) -= tid + } taskIdToSlaveId.remove(tid) } case None => @@ -346,7 +354,11 @@ extends MScheduler with DAGScheduler with Logging return Utils.serialize(props.toArray) } - override def frameworkMessage(d: SchedulerDriver, s: SlaveID, e: ExecutorID, b: Array[Byte]) {} + override def frameworkMessage( + d: SchedulerDriver, + s: SlaveID, + e: ExecutorID, + b: Array[Byte]) {} override def slaveLost(d: SchedulerDriver, s: SlaveID) { slavesWithExecutors.remove(s.getValue) @@ -355,21 +367,22 @@ extends MScheduler with DAGScheduler with Logging override def offerRescinded(d: SchedulerDriver, o: OfferID) {} /** - * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a - * number of megabytes. This is used to figure out how much memory to claim - * from Mesos based on the SPARK_MEM environment variable. + * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. + * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM + * environment variable. */ def memoryStringToMb(str: String): Int = { val lower = str.toLowerCase - if (lower.endsWith("k")) + if (lower.endsWith("k")) { (lower.substring(0, lower.length-1).toLong / 1024).toInt - else if (lower.endsWith("m")) + } else if (lower.endsWith("m")) { lower.substring(0, lower.length-1).toInt - else if (lower.endsWith("g")) + } else if (lower.endsWith("g")) { lower.substring(0, lower.length-1).toInt * 1024 - else if (lower.endsWith("t")) + } else if (lower.endsWith("t")) { lower.substring(0, lower.length-1).toInt * 1024 * 1024 - else // no suffix, so it's just a number in bytes - (lower.toLong / 1024 / 1024).toInt + } else {// no suffix, so it's just a number in bytes + (lower.toLong / 1024 / 1024).toInt + } } } diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/NewHadoopRDD.scala index c40a39cbe0824c36290a0ee9b91dfeffba3f2742..cd42586aa6b63e69f5c9a7bcaf2ef1ffaa29075c 100644 --- a/core/src/main/scala/spark/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/NewHadoopRDD.scala @@ -14,18 +14,20 @@ import java.util.Date import java.text.SimpleDateFormat class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) -extends Split { + extends Split { + val serializableHadoopSplit = new SerializableWritable(rawSplit) override def hashCode(): Int = (41 * (41 + rddId) + index).toInt } class NewHadoopRDD[K, V]( - sc: SparkContext, - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], valueClass: Class[V], - @transient conf: Configuration) -extends RDD[(K, V)](sc) { + sc: SparkContext, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], valueClass: Class[V], + @transient conf: Configuration) + extends RDD[(K, V)](sc) { + private val serializableConf = new SerializableWritable(conf) private val jobtrackerId: String = { @@ -33,15 +35,18 @@ extends RDD[(K, V)](sc) { formatter.format(new Date()) } - @transient private val jobId = new JobID(jobtrackerId, id) + @transient + private val jobId = new JobID(jobtrackerId, id) - @transient private val splits_ : Array[Split] = { + @transient + private val splits_ : Array[Split] = { val inputFormat = inputFormatClass.newInstance val jobContext = new JobContext(serializableConf.value, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Split](rawSplits.size) - for (i <- 0 until rawSplits.size) + for (i <- 0 until rawSplits.size) { result(i) = new NewHadoopSplit(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + } result } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 074d5abb38af7c77a2dd5534f7fb6282fecd867a..5b4de97e4b36c8b4c094a37d9434d59aae31da87 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -39,7 +39,11 @@ import SparkContext._ /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. */ -class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging with Serializable { +class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( + self: RDD[(K, V)]) + extends Logging + with Serializable { + def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = { def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = { for ((k, v) <- m2) { @@ -54,23 +58,20 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex } def combineByKey[C](createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C, - numSplits: Int, - partitioner: Partitioner) - : RDD[(K, C)] = - { + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + numSplits: Int, + partitioner: Partitioner): RDD[(K, C)] = { val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) new ShuffledRDD(self, aggregator, partitioner) } def combineByKey[C](createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C, - numSplits: Int) - : RDD[(K, C)] = { + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + numSplits: Int): RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, numSplits, - new HashPartitioner(numSplits)) + new HashPartitioner(numSplits)) } def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { @@ -159,9 +160,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex } def combineByKey[C](createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C) - : RDD[(K, C)] = { + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C) : RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultParallelism) } @@ -204,8 +204,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex case Some(p) => p case None => new HashPartitioner(defaultParallelism) } - val cg = new CoGroupedRDD[K](Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), part) - val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) + val cg = new CoGroupedRDD[K]( + Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), + part) + val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)( + classManifest[K], + Manifests.seqSeqManifest) prfs.mapValues { case Seq(vs, ws) => (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) @@ -219,7 +223,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex case None => new HashPartitioner(defaultParallelism) } new CoGroupedRDD[K]( - Seq(self.asInstanceOf[RDD[(_, _)]], + Seq(self.asInstanceOf[RDD[(_, _)]], other1.asInstanceOf[RDD[(_, _)]], other2.asInstanceOf[RDD[(_, _)]]), part).map { @@ -234,8 +238,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex val index = p.getPartition(key) def process(it: Iterator[(K, V)]): Seq[V] = { val buf = new ArrayBuffer[V] - for ((k, v) <- it if k == key) + for ((k, v) <- it if k == key) { buf += v + } buf } val res = self.context.runJob(self, process _, Array(index), false) @@ -253,10 +258,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) } - def saveAsNewAPIHadoopFile(path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { + def saveAsNewAPIHadoopFile( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { val job = new NewAPIHadoopJob job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) @@ -295,11 +301,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex jobCommitter.cleanupJob(jobTaskContext) } - def saveAsHadoopFile(path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: OutputFormat[_, _]], - conf: JobConf = new JobConf) { + def saveAsHadoopFile( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf = new JobConf) { conf.setOutputKeyClass(keyClass) conf.setOutputValueClass(valueClass) // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug @@ -313,12 +320,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex val outputFormatClass = conf.getOutputFormat val keyClass = conf.getOutputKeyClass val valueClass = conf.getOutputValueClass - if (outputFormatClass == null) + if (outputFormatClass == null) { throw new SparkException("Output format class not set") - if (keyClass == null) + } + if (keyClass == null) { throw new SparkException("Output key class not set") - if (valueClass == null) + } + if (valueClass == null) { throw new SparkException("Output value class not set") + } logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")") @@ -349,19 +359,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex def getValueClass() = implicitly[ClassManifest[V]].erasure } -class MappedValuesRDD[K, V, U]( - prev: RDD[(K, V)], f: V => U) -extends RDD[(K, U)](prev.context) { +class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) { override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner - override def compute(split: Split) = - prev.iterator(split).map{case (k, v) => (k, f(v))} + override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))} } -class FlatMappedValuesRDD[K, V, U]( - prev: RDD[(K, V)], f: V => Traversable[U]) -extends RDD[(K, U)](prev.context) { +class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => Traversable[U]) + extends RDD[(K, U)](prev.context) { + override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala index e96f73b3cfb9d2902fa42d2ee6d48d75d9a4f353..21f68f21c2c408ab2c22d994ca647fd5d3469988 100644 --- a/core/src/main/scala/spark/ParallelCollection.scala +++ b/core/src/main/scala/spark/ParallelCollection.scala @@ -4,15 +4,17 @@ import scala.collection.immutable.NumericRange import scala.collection.mutable.ArrayBuffer class ParallelCollectionSplit[T: ClassManifest]( - val rddId: Long, val slice: Int, values: Seq[T]) -extends Split with Serializable { + val rddId: Long, + val slice: Int, + values: Seq[T]) + extends Split with Serializable { + def iterator(): Iterator[T] = values.iterator override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt override def equals(other: Any): Boolean = other match { - case that: ParallelCollectionSplit[_] => - (this.rddId == that.rddId && this.slice == that.slice) + case that: ParallelCollectionSplit[_] => (this.rddId == that.rddId && this.slice == that.slice) case _ => false } @@ -20,13 +22,16 @@ extends Split with Serializable { } class ParallelCollection[T: ClassManifest]( - sc: SparkContext, @transient data: Seq[T], numSlices: Int) -extends RDD[T](sc) { - // TODO: Right now, each split sends along its full data, even if later down - // the RDD chain it gets cached. It might be worthwhile to write the data to - // a file in the DFS and read it in the split instead. + sc: SparkContext, + @transient data: Seq[T], + numSlices: Int) + extends RDD[T](sc) { + // TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets + // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split + // instead. - @transient val splits_ = { + @transient + val splits_ = { val slices = ParallelCollection.slice(data, numSlices).toArray slices.indices.map(i => new ParallelCollectionSplit(id, i, slices(i))).toArray } @@ -41,17 +46,24 @@ extends RDD[T](sc) { } private object ParallelCollection { - // Slice a collection into numSlices sub-collections. One extra thing we do here is - // to treat Range collections specially, encoding the slices as other Ranges to - // minimize memory cost. This makes it efficient to run Spark over RDDs representing - // large sets of numbers. + /** + * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range + * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes + * it efficient to run Spark over RDDs representing large sets of numbers. + */ def slice[T: ClassManifest](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = { - if (numSlices < 1) + if (numSlices < 1) { throw new IllegalArgumentException("Positive number of slices required") + } seq match { case r: Range.Inclusive => { - val sign = if (r.step < 0) -1 else 1 - slice(new Range(r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices) + val sign = if (r.step < 0) { + -1 + } else { + 1 + } + slice(new Range( + r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices) } case r: Range => { (0 until numSlices).map(i => { diff --git a/core/src/main/scala/spark/ParallelShuffleFetcher.scala b/core/src/main/scala/spark/ParallelShuffleFetcher.scala index 98eb37934b17f5ef31bc0f007e6eb413953a8029..19eb288e8460e599b501091f75b178cea388501a 100644 --- a/core/src/main/scala/spark/ParallelShuffleFetcher.scala +++ b/core/src/main/scala/spark/ParallelShuffleFetcher.scala @@ -31,8 +31,9 @@ class ParallelShuffleFetcher extends ShuffleFetcher with Logging { // Randomize them and put them in a LinkedBlockingQueue val serverQueue = new LinkedBlockingQueue[(String, ArrayBuffer[Int])] - for (pair <- Utils.randomize(inputsByUri)) + for (pair <- Utils.randomize(inputsByUri)) { serverQueue.put(pair) + } // Create a queue to hold the fetched data val resultQueue = new LinkedBlockingQueue[Array[Byte]] @@ -59,17 +60,19 @@ class ParallelShuffleFetcher extends ShuffleFetcher with Logging { val conn = new URL(url).openConnection() conn.connect() val len = conn.getContentLength() - if (len == -1) + if (len == -1) { throw new SparkException("Content length was not specified by server") + } val buf = new Array[Byte](len) val in = new FastBufferedInputStream(conn.getInputStream()) var pos = 0 while (pos < len) { val n = in.read(buf, pos, len-pos) - if (n == -1) + if (n == -1) { throw new SparkException("EOF before reading the expected " + len + " bytes") - else + } else { pos += n + } } // Done reading everything resultQueue.put(buf) diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index 4491de1734f3ed44a7218cce408012c9bd3a5eae..7b3c7b0b3729968dc60b28f20a3bb7f8934e9c9d 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -10,12 +10,17 @@ class HashPartitioner(partitions: Int) extends Partitioner { def getPartition(key: Any) = { val mod = key.hashCode % partitions - if (mod < 0) mod + partitions else mod // Guard against negative hash codes + if (mod < 0) { + mod + partitions + } else { + mod // Guard against negative hash codes + } } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions - case _ => false + case _ => + false } } \ No newline at end of file diff --git a/core/src/main/scala/spark/PipedRDD.scala b/core/src/main/scala/spark/PipedRDD.scala index 7c0049298655584faaa88185eca22db6e951e0dc..3f993d895a356ddceba5ff6e663256f118ed09cd 100644 --- a/core/src/main/scala/spark/PipedRDD.scala +++ b/core/src/main/scala/spark/PipedRDD.scala @@ -6,13 +6,12 @@ import java.util.StringTokenizer import scala.collection.mutable.ArrayBuffer import scala.io.Source - /** * An RDD that pipes the contents of each parent partition through an external command * (printing them one per line) and returns the output as a collection of strings. */ class PipedRDD[T: ClassManifest](parent: RDD[T], command: Seq[String]) -extends RDD[String](parent.context) { + extends RDD[String](parent.context) { // Similar to Runtime.exec(), if we are given a single string, split it into words // using a standard StringTokenizer (i.e. by spaces) def this(parent: RDD[T], command: String) = this(parent, PipedRDD.tokenize(command)) @@ -28,8 +27,9 @@ extends RDD[String](parent.context) { // Start a thread to print the process's stderr to ours new Thread("stderr reader for " + command) { override def run() { - for(line <- Source.fromInputStream(proc.getErrorStream).getLines) + for(line <- Source.fromInputStream(proc.getErrorStream).getLines) { System.err.println(line) + } } }.start() @@ -38,8 +38,9 @@ extends RDD[String](parent.context) { override def run() { SparkEnv.set(env) val out = new PrintWriter(proc.getOutputStream) - for(elem <- parent.iterator(split)) + for(elem <- parent.iterator(split)) { out.println(elem) + } out.close() } }.start() diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index e186ddae82e82156146a4056b65734fb1ac0686b..c85973fc0c021526c0221fc8b2c47c7c1d1e783e 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -27,24 +27,26 @@ import org.apache.hadoop.mapred.TextOutputFormat import SparkContext._ /** - * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents - * an immutable, partitioned collection of elements that can be operated on in parallel. + * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, + * partitioned collection of elements that can be operated on in parallel. * * Each RDD is characterized by five main properties: * - A list of splits (partitions) * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) - * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for HDFS) + * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for + * HDFS) * - * All the scheduling and execution in Spark is done based on these methods, allowing each - * RDD to implement its own way of computing itself. + * All the scheduling and execution in Spark is done based on these methods, allowing each RDD to + * implement its own way of computing itself. * - * This class also contains transformation methods available on all RDDs (e.g. map and filter). - * In addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs, - * and SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles. + * This class also contains transformation methods available on all RDDs (e.g. map and filter). In + * addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs, and + * SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles. */ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serializable { + // Methods that must be implemented by subclasses def splits: Array[Split] def compute(split: Split): Iterator[T] @@ -100,20 +102,17 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial if (initialCount > Integer.MAX_VALUE) { maxSelected = Integer.MAX_VALUE - } - else { + } else { maxSelected = initialCount.toInt } if (num > initialCount) { total = maxSelected - fraction = Math.min(multiplier*(maxSelected+1)/initialCount, 1.0) - } - else if (num < 0) { + fraction = Math.min(multiplier * (maxSelected + 1) / initialCount, 1.0) + } else if (num < 0) { throw(new IllegalArgumentException("Negative number of elements requested")) - } - else { - fraction = Math.min(multiplier*(num+1)/initialCount, 1.0) + } else { + fraction = Math.min(multiplier * (num + 1) / initialCount, 1.0) total = num.toInt } @@ -134,22 +133,18 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def glom(): RDD[Array[T]] = new GlommedRDD(this) - def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = - new CartesianRDD(sc, this, other) + def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(numSplits) } - def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = - groupBy[K](f, sc.defaultParallelism) + def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism) - def pipe(command: String): RDD[String] = - new PipedRDD(this, command) + def pipe(command: String): RDD[String] = new PipedRDD(this, command) - def pipe(command: Seq[String]): RDD[String] = - new PipedRDD(this, command) + def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command) def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] = new MapPartitionsRDD(this, sc.clean(f)) @@ -169,26 +164,29 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def reduce(f: (T, T) => T): T = { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { - if (iter.hasNext) + if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) - else + }else { None + } } val options = sc.runJob(this, reducePartition) val results = new ArrayBuffer[T] - for (opt <- options; elem <- opt) + for (opt <- options; elem <- opt) { results += elem - if (results.size == 0) + } + if (results.size == 0) { throw new UnsupportedOperationException("empty collection") - else + } else { return results.reduceLeft(cleanF) + } } /** - * Aggregate the elements of each partition, and then the results for all the - * partitions, using a given associative function and a neutral "zero value". - * The function op(t1, t2) is allowed to modify t1 and return it as its result - * value to avoid object allocation; however, it should not modify t2. + * Aggregate the elements of each partition, and then the results for all the partitions, using a + * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to + * modify t1 and return it as its result value to avoid object allocation; however, it should not + * modify t2. */ def fold(zeroValue: T)(op: (T, T) => T): T = { val cleanOp = sc.clean(op) @@ -197,19 +195,18 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } /** - * Aggregate the elements of each partition, and then the results for all the - * partitions, using given combine functions and a neutral "zero value". This - * function can return a different result type, U, than the type of this RDD, T. - * Thus, we need one operation for merging a T into an U and one operation for - * merging two U's, as in scala.TraversableOnce. Both of these functions are - * allowed to modify and return their first argument instead of creating a new U - * to avoid memory allocation. + * Aggregate the elements of each partition, and then the results for all the partitions, using + * given combine functions and a neutral "zero value". This function can return a different result + * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U + * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are + * allowed to modify and return their first argument instead of creating a new U to avoid memory + * allocation. */ def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val results = sc.runJob(this, - (iter: Iterator[T]) => iter.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)) + (iter: Iterator[T]) => iter.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)) return results.fold(zeroValue)(cleanCombOp) } @@ -226,12 +223,15 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def toArray(): Array[T] = collect() - // Take the first num elements of the RDD. This currently scans the partitions - // *one by one*, so it will be slow if a lot of partitions are required. In that - // case, use collect() to get the whole RDD instead. + /** + * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so + * it will be slow if a lot of partitions are required. In that case, use collect() to get the + * whole RDD instead. + */ def take(num: Int): Array[T] = { - if (num == 0) + if (num == 0) { return new Array[T](0) + } val buf = new ArrayBuffer[T] var p = 0 while (buf.size < num && p < splits.size) { @@ -251,48 +251,54 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } def saveAsTextFile(path: String) { - this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) + this.map(x => (NullWritable.get(), new Text(x.toString))) + .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } def saveAsObjectFile(path: String) { - this.glom.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path) + this.glom + .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) + .saveAsSequenceFile(path) } } class MappedRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], f: T => U) -extends RDD[U](prev.context) { + prev: RDD[T], + f: T => U) + extends RDD[U](prev.context) { + override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = prev.iterator(split).map(f) } class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], f: T => Traversable[U]) -extends RDD[U](prev.context) { + prev: RDD[T], + f: T => Traversable[U]) + extends RDD[U](prev.context) { + override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = prev.iterator(split).flatMap(f) } -class FilteredRDD[T: ClassManifest]( - prev: RDD[T], f: T => Boolean) -extends RDD[T](prev.context) { +class FilteredRDD[T: ClassManifest](prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) { override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = prev.iterator(split).filter(f) } -class GlommedRDD[T: ClassManifest](prev: RDD[T]) -extends RDD[Array[T]](prev.context) { +class GlommedRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) { override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = Array(prev.iterator(split).toArray).iterator } class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], f: Iterator[T] => Iterator[U]) -extends RDD[U](prev.context) { + prev: RDD[T], + f: Iterator[T] => Iterator[U]) + extends RDD[U](prev.context) { + override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = f(prev.iterator(split)) diff --git a/core/src/main/scala/spark/ResultTask.scala b/core/src/main/scala/spark/ResultTask.scala index 8bbe31444f6b6eedd91187e59c2da22e2f6f6d2b..25d85b7e0ced19366b5a2172220382ce5edeef7f 100644 --- a/core/src/main/scala/spark/ResultTask.scala +++ b/core/src/main/scala/spark/ResultTask.scala @@ -1,8 +1,14 @@ package spark -class ResultTask[T, U](stageId: Int, rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, - val partition: Int, locs: Seq[String], val outputId: Int) -extends DAGTask[U](stageId) { +class ResultTask[T, U]( + stageId: Int, + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + val partition: Int, + locs: Seq[String], + val outputId: Int) + extends DAGTask[U](stageId) { + val split = rdd.splits(partition) override def run(attemptId: Int): U = { diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/SampledRDD.scala index 21c1148b63af2c62e603b75208cbc12c825f8437..c9a9e53d18475e3be1a86577de984dd948695a2a 100644 --- a/core/src/main/scala/spark/SampledRDD.scala +++ b/core/src/main/scala/spark/SampledRDD.scala @@ -7,16 +7,24 @@ class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Seriali } class SampledRDD[T: ClassManifest]( - prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int) -extends RDD[T](prev.context) { + prev: RDD[T], + withReplacement: Boolean, + frac: Double, + seed: Int) + extends RDD[T](prev.context) { - @transient val splits_ = { val rg = new Random(seed); prev.splits.map(x => new SampledRDDSplit(x, rg.nextInt)) } + @transient + val splits_ = { + val rg = new Random(seed); + prev.splits.map(x => new SampledRDDSplit(x, rg.nextInt)) + } override def splits = splits_.asInstanceOf[Array[Split]] override val dependencies = List(new OneToOneDependency(prev)) - override def preferredLocations(split: Split) = prev.preferredLocations(split.asInstanceOf[SampledRDDSplit].prev) + override def preferredLocations(split: Split) = + prev.preferredLocations(split.asInstanceOf[SampledRDDSplit].prev) override def compute(splitIn: Split) = { val split = splitIn.asInstanceOf[SampledRDDSplit] @@ -25,11 +33,13 @@ extends RDD[T](prev.context) { if (withReplacement) { val oldData = prev.iterator(split.prev).toArray val sampleSize = (oldData.size * frac).ceil.toInt - val sampledData = for (i <- 1 to sampleSize) yield oldData(rg.nextInt(oldData.size)) // all of oldData's indices are candidates, even if sampleSize < oldData.size + val sampledData = { + // all of oldData's indices are candidates, even if sampleSize < oldData.size + for (i <- 1 to sampleSize) + yield oldData(rg.nextInt(oldData.size)) + } sampledData.iterator - } - // Sampling without replacement - else { + } else { // Sampling without replacement prev.iterator(split.prev).filter(x => (rg.nextDouble <= frac)) } } diff --git a/core/src/main/scala/spark/Scheduler.scala b/core/src/main/scala/spark/Scheduler.scala index df86db64a627d18b9eed3869211669d75a5dd627..6c7e569313b9f6a325b39c1606700715b90c56d9 100644 --- a/core/src/main/scala/spark/Scheduler.scala +++ b/core/src/main/scala/spark/Scheduler.scala @@ -1,17 +1,24 @@ package spark -// Scheduler trait, implemented by both MesosScheduler and LocalScheduler. +/** + * Scheduler trait, implemented by both MesosScheduler and LocalScheduler. + */ private trait Scheduler { def start() // Wait for registration with Mesos. def waitForRegister() - // Run a function on some partitions of an RDD, returning an array of results. The allowLocal flag specifies - // whether the scheduler is allowed to run the job on the master machine rather than shipping it to the cluster, - // for actions that create short jobs such as first() and take(). - def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, - partitions: Seq[Int], allowLocal: Boolean): Array[U] + /** + * Run a function on some partitions of an RDD, returning an array of results. The allowLocal + * flag specifies whether the scheduler is allowed to run the job on the master machine rather + * than shipping it to the cluster, for actions that create short jobs such as first() and take(). + */ + def runJob[T, U: ClassManifest]( + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], + allowLocal: Boolean): Array[U] def stop() diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index bd4a526b89ad7525cc971ee5875477ab8cfe1266..b213ca9dcbde6c70ad6ef03ca4c2150a84a1390f 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -25,26 +25,29 @@ import org.apache.hadoop.io.Text import SparkContext._ - /** * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile, * through an implicit conversion. Note that this can't be part of PairRDDFunctions because * we need more implicit parameters to convert our keys and values to Writable. */ -class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging with Serializable { +class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest]( + self: RDD[(K,V)]) + extends Logging + with Serializable { + def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { val c = { - if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) + if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { classManifest[T].erasure - else + } else { implicitly[T => Writable].getClass.getMethods()(0).getReturnType + } // TODO: use something like WritableConverter to avoid reflection } c.asInstanceOf[Class[ _ <: Writable]] } def saveAsSequenceFile(path: String) { - def anyToWritable[U <% Writable](u: U): Writable = u val keyClass = getWritableClass[K] diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala index cfc6d978bce818d467fd851e8dbe35152c3c99dc..15fca9fcda1f479a065fa12fbb0a9f90f351fce9 100644 --- a/core/src/main/scala/spark/Serializer.scala +++ b/core/src/main/scala/spark/Serializer.scala @@ -3,9 +3,8 @@ package spark import java.io.{InputStream, OutputStream} /** - * A serializer. Because some serialization libraries are not thread safe, - * this class is used to create SerializerInstances that do the actual - * serialization. + * A serializer. Because some serialization libraries are not thread safe, this class is used to + * create SerializerInstances that do the actual serialization. */ trait Serializer { def newInstance(): SerializerInstance diff --git a/core/src/main/scala/spark/SerializingCache.scala b/core/src/main/scala/spark/SerializingCache.scala index 2c1f96a7001dd49108a0669554f96eb1fc445f07..a74922ec4ce13fa251589ad36780e7aa0610c8f5 100644 --- a/core/src/main/scala/spark/SerializingCache.scala +++ b/core/src/main/scala/spark/SerializingCache.scala @@ -3,8 +3,8 @@ package spark import java.io._ /** - * Wrapper around a BoundedMemoryCache that stores serialized objects as - * byte arrays in order to reduce storage cost and GC overhead + * Wrapper around a BoundedMemoryCache that stores serialized objects as byte arrays in order to + * reduce storage cost and GC overhead */ class SerializingCache extends Cache with Logging { val bmc = new BoundedMemoryCache diff --git a/core/src/main/scala/spark/ShuffleMapTask.scala b/core/src/main/scala/spark/ShuffleMapTask.scala index d68c6cf4efea269d67ad47099b8084460614a603..d7c488109737bcbdfe9a4b56e242d18d49c3a112 100644 --- a/core/src/main/scala/spark/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/ShuffleMapTask.scala @@ -7,9 +7,15 @@ import java.util.{HashMap => JHashMap} import it.unimi.dsi.fastutil.io.FastBufferedOutputStream - -class ShuffleMapTask(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_], val partition: Int, locs: Seq[String]) -extends DAGTask[String](stageId) with Logging { +class ShuffleMapTask( + stageId: Int, + rdd: RDD[_], + dep: ShuffleDependency[_,_,_], + val partition: Int, + locs: Seq[String]) + extends DAGTask[String](stageId) + with Logging { + val split = rdd.splits(partition) override def run (attemptId: Int): String = { diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala index 9cada0617e7e9f31b52e4caad6a237385c817151..5efc8cf50b8ef27154c59a2bf00bd7a3d2220114 100644 --- a/core/src/main/scala/spark/ShuffledRDD.scala +++ b/core/src/main/scala/spark/ShuffledRDD.scala @@ -2,22 +2,21 @@ package spark import java.util.{HashMap => JHashMap} - class ShuffledRDDSplit(val idx: Int) extends Split { override val index = idx override def hashCode(): Int = idx } class ShuffledRDD[K, V, C]( - parent: RDD[(K, V)], - aggregator: Aggregator[K, V, C], - part : Partitioner) -extends RDD[(K, C)](parent.context) { + parent: RDD[(K, V)], + aggregator: Aggregator[K, V, C], + part : Partitioner) + extends RDD[(K, C)](parent.context) { //override val partitioner = Some(part) override val partitioner = Some(part) - @transient val splits_ = - Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) + @transient + val splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) override def splits = splits_ diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index bf881fb2d4fdcc75ac2a2a5797b0e4dc291e2d15..6eee8b45cea8d741f1299a82ee1a68ada9dc9206 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -10,14 +10,16 @@ import com.google.protobuf.ByteString import org.apache.mesos._ import org.apache.mesos.Protos._ - /** * A Job that runs a set of tasks with no interdependencies. */ class SimpleJob( - sched: MesosScheduler, tasksSeq: Seq[Task[_]], val jobId: Int) -extends Job(jobId) with Logging -{ + sched: MesosScheduler, + tasksSeq: Seq[Task[_]], + val jobId: Int) + extends Job(jobId) + with Logging { + // Maximum time to wait to run a task in a preferred location (in ms) val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "5000").toLong @@ -61,7 +63,8 @@ extends Job(jobId) with Logging var causeOfFailure = "" // How frequently to reprint duplicate exceptions in full, in milliseconds - val EXCEPTION_PRINT_INTERVAL = System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong + val EXCEPTION_PRINT_INTERVAL = + System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong // Map of recent exceptions (identified by string representation and // top stack frame) to duplicate count (how many times the same // exception has appeared) and time the full exception was @@ -162,21 +165,20 @@ extends Job(jobId) with Logging lastPreferredLaunchTime = time // Create and return the Mesos task object val cpuRes = Resource.newBuilder() - .setName("cpus") - .setType(Resource.Type.SCALAR) - .setScalar(Resource.Scalar.newBuilder() - .setValue(CPUS_PER_TASK).build()) - .build() + .setName("cpus") + .setType(Resource.Type.SCALAR) + .setScalar(Resource.Scalar.newBuilder().setValue(CPUS_PER_TASK).build()) + .build() val serializedTask = Utils.serialize(task) logDebug("Serialized size: " + serializedTask.size) val taskName = "task %d:%d".format(jobId, index) return Some(TaskDescription.newBuilder() - .setTaskId(taskId) - .setSlaveId(offer.getSlaveId) - .setName(taskName) - .addResources(cpuRes) - .setData(ByteString.copyFrom(serializedTask)) - .build()) + .setTaskId(taskId) + .setSlaveId(offer.getSlaveId) + .setName(taskName) + .addResources(cpuRes) + .setData(ByteString.copyFrom(serializedTask)) + .build()) } case _ => } @@ -203,8 +205,7 @@ extends Job(jobId) with Logging val index = tidToIndex(tid) if (!finished(index)) { tasksFinished += 1 - logInfo("Finished TID %s (progress: %d/%d)".format( - tid, tasksFinished, numTasks)) + logInfo("Finished TID %s (progress: %d/%d)".format(tid, tasksFinished, numTasks)) // Deserialize task result val result = Utils.deserialize[TaskResult[_]](status.getData.toByteArray) sched.taskEnded(tasks(index), Success, result.value, result.accumUpdates) @@ -235,8 +236,9 @@ extends Job(jobId) with Logging sched.taskEnded(tasks(index), fetchFailed, null, null) finished(index) = true tasksFinished += 1 - if (tasksFinished == numTasks) + if (tasksFinished == numTasks) { sched.jobFinished(this) + } return case ef: ExceptionFailure => val key = ef.exception.toString @@ -278,8 +280,7 @@ extends Job(jobId) with Logging if (numFailures(index) > MAX_TASK_FAILURES) { logError("Task %d:%d failed more than %d times; aborting job".format( jobId, index, MAX_TASK_FAILURES)) - abort("Task %d failed more than %d times".format( - index, MAX_TASK_FAILURES)) + abort("Task %d failed more than %d times".format(index, MAX_TASK_FAILURES)) } } } else { diff --git a/core/src/main/scala/spark/SimpleShuffleFetcher.scala b/core/src/main/scala/spark/SimpleShuffleFetcher.scala index 1e38a2b1db021ccf35f11d55d2b0d036e16406a2..196c64cf1fb76758c9d1251dc296ddcb58d863cd 100644 --- a/core/src/main/scala/spark/SimpleShuffleFetcher.scala +++ b/core/src/main/scala/spark/SimpleShuffleFetcher.scala @@ -8,7 +8,6 @@ import scala.collection.mutable.HashMap import it.unimi.dsi.fastutil.io.FastBufferedInputStream - class SimpleShuffleFetcher extends ShuffleFetcher with Logging { def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) { logInfo("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId)) diff --git a/core/src/main/scala/spark/SizeEstimator.scala b/core/src/main/scala/spark/SizeEstimator.scala index a3774fb0551274738fed0b1ce7dd9e47ac4f88a9..4b89503e84ab0bd51eaaf4e547c1f2085b3452d8 100644 --- a/core/src/main/scala/spark/SizeEstimator.scala +++ b/core/src/main/scala/spark/SizeEstimator.scala @@ -9,10 +9,9 @@ import java.util.Random import scala.collection.mutable.ArrayBuffer - /** - * Estimates the sizes of Java objects (number of bytes of memory they occupy), - * for use in memory-aware caches. + * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in + * memory-aware caches. * * Based on the following JavaWorld article: * http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html @@ -36,9 +35,9 @@ object SizeEstimator { classInfos.put(classOf[Object], new ClassInfo(OBJECT_SIZE, Nil)) /** - * The state of an ongoing size estimation. Contains a stack of objects - * to visit as well as an IdentityHashMap of visited objects, and provides - * utility methods for enqueueing new objects to visit. + * The state of an ongoing size estimation. Contains a stack of objects to visit as well as an + * IdentityHashMap of visited objects, and provides utility methods for enqueueing new objects + * to visit. */ private class SearchState { val visited = new IdentityHashMap[AnyRef, AnyRef] diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 9f1a49f853fec728d35c00df4e0559ea7fcba58d..6b8a7e69c44adb96aa50a767894ff8fd0b4e3abd 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -31,29 +31,33 @@ import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} import spark.broadcast._ class SparkContext( - master: String, - frameworkName: String, - val sparkHome: String = null, - val jars: Seq[String] = Nil) -extends Logging { + master: String, + frameworkName: String, + val sparkHome: String = null, + val jars: Seq[String] = Nil) + extends Logging { + // Ensure logging is initialized before we spawn any threads initLogging() // Set Spark master host and port system properties - if (System.getProperty("spark.master.host") == null) + if (System.getProperty("spark.master.host") == null) { System.setProperty("spark.master.host", Utils.localHostName) - if (System.getProperty("spark.master.port") == null) + } + if (System.getProperty("spark.master.port") == null) { System.setProperty("spark.master.port", "7077") + } // Make sure a proper class loader is set for remote actors (unless user set one) - if (RemoteActor.classLoader == null) + if (RemoteActor.classLoader == null) { RemoteActor.classLoader = getClass.getClassLoader + } // Create the Spark execution environment (cache, map output tracker, etc) val env = SparkEnv.createFromSystemProperties(true) SparkEnv.set(env) Broadcast.initialize(true) - + // Create and start the scheduler private var scheduler: Scheduler = { // Regular expression used for local[N] master format @@ -61,9 +65,9 @@ extends Logging { // Regular expression for local[N, maxRetries], used in tests with failing tasks val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r master match { - case "local" => + case "local" => new LocalScheduler(1, 0) - case LOCAL_N_REGEX(threads) => + case LOCAL_N_REGEX(threads) => new LocalScheduler(threads.toInt, 0) case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => new LocalScheduler(threads.toInt, maxFailures.toInt) @@ -78,11 +82,13 @@ extends Logging { // Methods for creating RDDs - def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = + def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = { new ParallelCollection[T](this, seq, numSlices) + } - def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = + def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = { parallelize(seq, numSlices) + } def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) @@ -90,26 +96,28 @@ extends Logging { } /** - * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving - * its InputFormat and any other necessary info (e.g. file name for a - * filesystem-based dataset, table name for HyperTable, etc). + * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any + * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable, + * etc). */ - def hadoopRDD[K, V](conf: JobConf, - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int = defaultMinSplits) - : RDD[(K, V)] = { + def hadoopRDD[K, V]( + conf: JobConf, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int = defaultMinSplits + ): RDD[(K, V)] = { new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ - def hadoopFile[K, V](path: String, - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int = defaultMinSplits) - : RDD[(K, V)] = { + def hadoopFile[K, V]( + path: String, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int = defaultMinSplits + ) : RDD[(K, V)] = { val conf = new JobConf() FileInputFormat.setInputPaths(conf, path) val bufferSize = System.getProperty("spark.buffer.size", "65536") @@ -118,15 +126,17 @@ extends Logging { } /** - * Smarter version of hadoopFile() that uses class manifests to figure out - * the classes of keys, values and the InputFormat so that users don't need - * to pass them directly. + * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, + * values and the InputFormat so that users don't need to pass them directly. */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int) (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]) : RDD[(K, V)] = { - hadoopFile(path, fm.erasure.asInstanceOf[Class[F]], km.erasure.asInstanceOf[Class[K]], - vm.erasure.asInstanceOf[Class[V]], minSplits) + hadoopFile(path, + fm.erasure.asInstanceOf[Class[F]], + km.erasure.asInstanceOf[Class[K]], + vm.erasure.asInstanceOf[Class[V]], + minSplits) } def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) @@ -136,31 +146,35 @@ extends Logging { /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String) (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = { - val job = new NewHadoopJob - NewFileInputFormat.addInputPath(job, new Path(path)) - val conf = job.getConfiguration - newAPIHadoopFile(path, - fm.erasure.asInstanceOf[Class[F]], - km.erasure.asInstanceOf[Class[K]], - vm.erasure.asInstanceOf[Class[V]], - conf) + val job = new NewHadoopJob + NewFileInputFormat.addInputPath(job, new Path(path)) + val conf = job.getConfiguration + newAPIHadoopFile( + path, + fm.erasure.asInstanceOf[Class[F]], + km.erasure.asInstanceOf[Class[K]], + vm.erasure.asInstanceOf[Class[V]], + conf) } - /** Get an RDD for a given Hadoop file with an arbitrary new API InputFormat and extra - * configuration options to pass to the input format. + /** + * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat + * and extra configuration options to pass to the input format. */ - def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String, - fClass: Class[F], - kClass: Class[K], - vClass: Class[V], - conf: Configuration): RDD[(K, V)] = - new NewHadoopRDD(this, fClass, kClass, vClass, conf) + def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( + path: String, + fClass: Class[F], + kClass: Class[K], + vClass: Class[V], + conf: Configuration + ): RDD[(K, V)] = new NewHadoopRDD(this, fClass, kClass, vClass, conf) /** Get an RDD for a Hadoop SequenceFile with given key and value types */ def sequenceFile[K, V](path: String, - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int): RDD[(K, V)] = { + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int + ): RDD[(K, V)] = { val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits) } @@ -169,41 +183,47 @@ extends Logging { sequenceFile(path, keyClass, valueClass, defaultMinSplits) /** - * Version of sequenceFile() for types implicitly convertible to Writables through a WritableConverter. + * Version of sequenceFile() for types implicitly convertible to Writables through a + * WritableConverter. * - * WritableConverters are provided in a somewhat strange way (by an implicit function) to support both - * subclasses of Writable and types for which we define a converter (e.g. Int to IntWritable). The most - * natural thing would've been to have implicit objects for the converters, but then we couldn't have - * an object for every subclass of Writable (you can't have a parameterized singleton object). We use - * functions instead to create a new converter for the appropriate type. In addition, we pass the converter - * a ClassManifest of its type to allow it to figure out the Writable class to use in the subclass case. + * WritableConverters are provided in a somewhat strange way (by an implicit function) to support + * both subclasses of Writable and types for which we define a converter (e.g. Int to + * IntWritable). The most natural thing would've been to have implicit objects for the + * converters, but then we couldn't have an object for every subclass of Writable (you can't + * have a parameterized singleton object). We use functions instead to create a new converter + * for the appropriate type. In addition, we pass the converter a ClassManifest of its type to + * allow it to figure out the Writable class to use in the subclass case. */ def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits) - (implicit km: ClassManifest[K], vm: ClassManifest[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) + (implicit km: ClassManifest[K], vm: ClassManifest[V], + kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) : RDD[(K, V)] = { val kc = kcf() val vc = vcf() val format = classOf[SequenceFileInputFormat[Writable, Writable]] - val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]], - vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits) + val writables = hadoopFile(path, format, + kc.writableClass(km).asInstanceOf[Class[Writable]], + vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits) writables.map{case (k,v) => (kc.convert(k), vc.convert(v))} } /** - * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys - * and BytesWritable values that contain a serialized partition. This is still an experimental - * storage format and may not be supported exactly as is in future Spark releases. It will also - * be pretty slow if you use the default serializer (Java serialization), though the nice thing - * about it is that there's very little effort required to save arbitrary objects. + * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and + * BytesWritable values that contain a serialized partition. This is still an experimental storage + * format and may not be supported exactly as is in future Spark releases. It will also be pretty + * slow if you use the default serializer (Java serialization), though the nice thing about it is + * that there's very little effort required to save arbitrary objects. */ - def objectFile[T: ClassManifest](path: String, minSplits: Int = defaultMinSplits): RDD[T] = { + def objectFile[T: ClassManifest]( + path: String, + minSplits: Int = defaultMinSplits + ): RDD[T] = { sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits) .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes)) } /** Build the union of a list of RDDs. */ - def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = - new UnionRDD(this, rdds) + def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] = new UnionRDD(this, rdds) // Methods for creating shared variables @@ -211,19 +231,18 @@ extends Logging { new Accumulator(initialValue, param) // Keep around a weak hash map of values to Cached versions? - def broadcast[T](value: T) = - Broadcast.getBroadcastFactory.newBroadcast[T] (value, isLocal) + def broadcast[T](value: T) = Broadcast.getBroadcastFactory.newBroadcast[T] (value, isLocal) // Stop the SparkContext def stop() { - scheduler.stop() - scheduler = null - // TODO: Broadcast.stop(), Cache.stop()? - env.mapOutputTracker.stop() - env.cacheTracker.stop() - env.shuffleFetcher.stop() - env.shuffleManager.stop() - SparkEnv.set(null) + scheduler.stop() + scheduler = null + // TODO: Broadcast.stop(), Cache.stop()? + env.mapOutputTracker.stop() + env.cacheTracker.stop() + env.shuffleFetcher.stop() + env.shuffleManager.stop() + SparkEnv.set(null) } // Wait for the scheduler to be registered @@ -235,25 +254,29 @@ extends Logging { // or the spark.home Java property, or the SPARK_HOME environment variable // (in that order of preference). If neither of these is set, return None. def getSparkHome(): Option[String] = { - if (sparkHome != null) + if (sparkHome != null) { Some(sparkHome) - else if (System.getProperty("spark.home") != null) + } else if (System.getProperty("spark.home") != null) { Some(System.getProperty("spark.home")) - else if (System.getenv("SPARK_HOME") != null) + } else if (System.getenv("SPARK_HOME") != null) { Some(System.getenv("SPARK_HOME")) - else + } else { None + } } /** - * Run a function on a given set of partitions in an RDD and return the results. - * This is the main entry point to the scheduler, by which all actions get launched. - * The allowLocal flag specifies whether the scheduler can run the computation on the - * master rather than shipping it out to the cluster, for short actions like first(). + * Run a function on a given set of partitions in an RDD and return the results. This is the main + * entry point to the scheduler, by which all actions get launched. The allowLocal flag specifies + * whether the scheduler can run the computation on the master rather than shipping it out to the + * cluster, for short actions like first(). */ - def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, - partitions: Seq[Int], allowLocal: Boolean) - : Array[U] = { + def runJob[T, U: ClassManifest]( + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], + allowLocal: Boolean + ): Array[U] = { logInfo("Starting job...") val start = System.nanoTime val result = scheduler.runJob(rdd, func, partitions, allowLocal) @@ -261,22 +284,23 @@ extends Logging { result } - def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int], - allowLocal: Boolean) - : Array[U] = { + def runJob[T, U: ClassManifest]( + rdd: RDD[T], + func: Iterator[T] => U, + partitions: Seq[Int], + allowLocal: Boolean + ): Array[U] = { runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal) } /** * Run a job on all partitions in an RDD and return the results in an array. */ - def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U) - : Array[U] = { + def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = { runJob(rdd, func, 0 until rdd.splits.size, false) } - def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U) - : Array[U] = { + def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.splits.size, false) } @@ -307,10 +331,9 @@ extends Logging { } } - /** - * The SparkContext object contains a number of implicit conversions and - * parameters for use with various Spark features. + * The SparkContext object contains a number of implicit conversions and parameters for use with + * various Spark features. */ object SparkContext { implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { @@ -324,7 +347,6 @@ object SparkContext { } // TODO: Add AccumulatorParams for other types, e.g. lists and strings - implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = new PairRDDFunctions(rdd) @@ -347,13 +369,14 @@ object SparkContext { implicit def stringToText(s: String) = new Text(s) - private implicit def arrayToArrayWritable[T <% Writable: ClassManifest] (arr: Traversable[T]): ArrayWritable = { + private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = { def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { val c = { - if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) + if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { classManifest[T].erasure - else + } else { implicitly[T => Writable].getClass.getMethods()(0).getReturnType + } // TODO: use something like WritableConverter to avoid reflection } c.asInstanceOf[Class[ _ <: Writable]] @@ -361,11 +384,11 @@ object SparkContext { def anyToWritable[U <% Writable](u: U): Writable = u - new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], arr.map(x => anyToWritable(x)).toArray) + new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], + arr.map(x => anyToWritable(x)).toArray) } // Helper objects for converting common types to Writable - private def simpleWritableConverter[T, W <: Writable: ClassManifest](convert: W => T) = { val wClass = classManifest[W].erasure.asInstanceOf[Class[W]] new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) @@ -417,4 +440,7 @@ object SparkContext { * that doesn't know the type of T when it is created. This sounds strange but is necessary to * support converting subclasses of Writable to themselves (writableWritableConverter). */ -class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable +class WritableConverter[T]( + val writableClass: ClassManifest[T] => Class[_ <: Writable], + val convert: Writable => T) + extends Serializable diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index d37d1aa90912b2a4ae78da0c0dc5e6b9fa6e1fa1..e2d1562e3574795d54519e3a7af0ae9932115c0a 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -31,8 +31,10 @@ object SparkEnv { val mapOutputTracker = new MapOutputTracker(isMaster) - val shuffleFetcherClass = System.getProperty("spark.shuffle.fetcher", "spark.SimpleShuffleFetcher") - val shuffleFetcher = Class.forName(shuffleFetcherClass).newInstance().asInstanceOf[ShuffleFetcher] + val shuffleFetcherClass = + System.getProperty("spark.shuffle.fetcher", "spark.SimpleShuffleFetcher") + val shuffleFetcher = + Class.forName(shuffleFetcherClass).newInstance().asInstanceOf[ShuffleFetcher] val shuffleMgr = new ShuffleManager() diff --git a/core/src/main/scala/spark/Stage.scala b/core/src/main/scala/spark/Stage.scala index 401b33bd1629927a68c52ee5fb8ff7b5ab24dc38..9452ea3a8e57db93c4cc31744a80bef8b3dfbd15 100644 --- a/core/src/main/scala/spark/Stage.scala +++ b/core/src/main/scala/spark/Stage.scala @@ -1,16 +1,22 @@ package spark -class Stage(val id: Int, val rdd: RDD[_], val shuffleDep: Option[ShuffleDependency[_,_,_]], val parents: List[Stage]) { +class Stage( + val id: Int, + val rdd: RDD[_], + val shuffleDep: Option[ShuffleDependency[_,_,_]], + val parents: List[Stage]) { + val isShuffleMap = shuffleDep != None val numPartitions = rdd.splits.size val outputLocs = Array.fill[List[String]](numPartitions)(Nil) var numAvailableOutputs = 0 def isAvailable: Boolean = { - if (parents.size == 0 && !isShuffleMap) + if (parents.size == 0 && !isShuffleMap) { true - else + } else { numAvailableOutputs == numPartitions + } } def addOutputLoc(partition: Int, host: String) { @@ -24,8 +30,9 @@ class Stage(val id: Int, val rdd: RDD[_], val shuffleDep: Option[ShuffleDependen val prevList = outputLocs(partition) val newList = prevList.filterNot(_ == host) outputLocs(partition) = newList - if (prevList != Nil && newList == Nil) + if (prevList != Nil && newList == Nil) { numAvailableOutputs -= 1 + } } override def toString = "Stage " + id diff --git a/core/src/main/scala/spark/Task.scala b/core/src/main/scala/spark/Task.scala index c34083416f56e688df35d908172682dd16ad757a..bc3b3743447bda9d887bbbe970beb2ef52dbf38e 100644 --- a/core/src/main/scala/spark/Task.scala +++ b/core/src/main/scala/spark/Task.scala @@ -3,7 +3,7 @@ package spark class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) extends Serializable abstract class Task[T] extends Serializable { - def run (id: Int): T + def run(id: Int): T def preferredLocations: Seq[String] = Nil def generation: Option[Long] = None } diff --git a/core/src/main/scala/spark/UnionRDD.scala b/core/src/main/scala/spark/UnionRDD.scala index dadfd94eefdb5c9c931076152f5f817055c1f5d0..6fded339ee885ba7c372690d4e52963e8fbf4bbb 100644 --- a/core/src/main/scala/spark/UnionRDD.scala +++ b/core/src/main/scala/spark/UnionRDD.scala @@ -2,16 +2,26 @@ package spark import scala.collection.mutable.ArrayBuffer -class UnionSplit[T: ClassManifest](idx: Int, rdd: RDD[T], split: Split) -extends Split with Serializable { +class UnionSplit[T: ClassManifest]( + idx: Int, + rdd: RDD[T], + split: Split) + extends Split + with Serializable { + def iterator() = rdd.iterator(split) def preferredLocations() = rdd.preferredLocations(split) override val index = idx } -class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]]) -extends RDD[T](sc) with Serializable { - @transient val splits_ : Array[Split] = { +class UnionRDD[T: ClassManifest]( + sc: SparkContext, + rdds: Seq[RDD[T]]) + extends RDD[T](sc) + with Serializable { + + @transient + val splits_ : Array[Split] = { val array = new Array[Split](rdds.map(_.splits.size).sum) var pos = 0 for (rdd <- rdds; split <- rdd.splits) { @@ -33,8 +43,7 @@ extends RDD[T](sc) with Serializable { deps.toList } - override def compute(s: Split): Iterator[T] = - s.asInstanceOf[UnionSplit[T]].iterator() + override def compute(s: Split): Iterator[T] = s.asInstanceOf[UnionSplit[T]].iterator() override def preferredLocations(s: Split): Seq[String] = s.asInstanceOf[UnionSplit[T]].preferredLocations() diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index e04df8661b2c2b9a99744b300ad637da4bbda5b7..58b5fa6bbd77fbad5a814989468a935f1320b616 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -35,7 +35,7 @@ object Utils { return ois.readObject.asInstanceOf[T] } - def isAlpha(c: Char) = { + def isAlpha(c: Char): Boolean = { (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') } @@ -59,16 +59,15 @@ object Utils { } // Create a temporary directory inside the given parent directory - def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = - { + def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = { var attempts = 0 val maxAttempts = 10 var dir: File = null while (dir == null) { attempts += 1 if (attempts > maxAttempts) { - throw new IOException("Failed to create a temp directory " + - "after " + maxAttempts + " attempts!") + throw new IOException("Failed to create a temp directory after " + maxAttempts + + " attempts!") } try { dir = new File(root, "spark-" + UUID.randomUUID.toString) @@ -137,8 +136,7 @@ object Utils { * Wrapper over newCachedThreadPool. */ def newDaemonCachedThreadPool(): ThreadPoolExecutor = { - var threadPool = - Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor] + var threadPool = Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor] threadPool.setThreadFactory (newDaemonThreadFactory) @@ -149,8 +147,7 @@ object Utils { * Wrapper over newFixedThreadPool. */ def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = { - var threadPool = - Executors.newFixedThreadPool(nThreads).asInstanceOf[ThreadPoolExecutor] + var threadPool = Executors.newFixedThreadPool(nThreads).asInstanceOf[ThreadPoolExecutor] threadPool.setThreadFactory(newDaemonThreadFactory) diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index f492ca762c4092f63bc72e6c9dd90cb4b12d1936..cdf05fe5de8ba40cee9a522cb055aae9798f1ff2 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -18,8 +18,7 @@ trait Broadcast[T] extends Serializable { override def toString = "spark.Broadcast(" + uuid + ")" } -object Broadcast -extends Logging with Serializable { +object Broadcast extends Logging with Serializable { // Messages val REGISTER_BROADCAST_TRACKER = 0 val UNREGISTER_BROADCAST_TRACKER = 1 @@ -90,18 +89,14 @@ extends Logging with Serializable { private var MaxPeersInGuideResponse_ = System.getProperty( "spark.broadcast.maxPeersInGuideResponse", "4").toInt - private var MaxRxSlots_ = System.getProperty( - "spark.broadcast.maxRxSlots", "4").toInt - private var MaxTxSlots_ = System.getProperty( - "spark.broadcast.maxTxSlots", "4").toInt + private var MaxRxSlots_ = System.getProperty("spark.broadcast.maxRxSlots", "4").toInt + private var MaxTxSlots_ = System.getProperty("spark.broadcast.maxTxSlots", "4").toInt - private var MaxChatTime_ = System.getProperty( - "spark.broadcast.maxChatTime", "500").toInt - private var MaxChatBlocks_ = System.getProperty( - "spark.broadcast.maxChatBlocks", "1024").toInt + private var MaxChatTime_ = System.getProperty("spark.broadcast.maxChatTime", "500").toInt + private var MaxChatBlocks_ = System.getProperty("spark.broadcast.maxChatBlocks", "1024").toInt private var EndGameFraction_ = System.getProperty( - "spark.broadcast.endGameFraction", "0.95").toDouble + "spark.broadcast.endGameFraction", "0.95").toDouble def isMaster = isMaster_ @@ -167,9 +162,9 @@ extends Logging with Serializable { } // Helper function to convert Array[BroadcastBlock] to object - def unBlockifyObject[OUT](arrayOfBlocks: Array[BroadcastBlock], - totalBytes: Int, - totalBlocks: Int): OUT = { + def unBlockifyObject[OUT](arrayOfBlocks: Array[BroadcastBlock], + totalBytes: Int, + totalBlocks: Int): OUT = { var retByteArray = new Array[Byte](totalBytes) for (i <- 0 until totalBlocks) { @@ -193,9 +188,12 @@ extends Logging with Serializable { case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) extends Serializable case class VariableInfo (@transient val arrayOfBlocks : Array[BroadcastBlock], - val totalBlocks: Int, - val totalBytes: Int) extends Serializable { - @transient var hasBlocks = 0 + val totalBlocks: Int, + val totalBytes: Int) + extends Serializable { + + @transient + var hasBlocks = 0 } class SpeedTracker extends Serializable { diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala index 3139a0a6e2b85fe61766cc7eb1d14ba5c410c001..a153679ab3e7933080a68d71b4265c44d6ad0e3f 100644 --- a/examples/src/main/scala/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala @@ -54,7 +54,7 @@ object SparkKMeans { while(tempDist > convergeDist) { var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) - var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1+y2)} + var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collect() diff --git a/lr_data.txt b/lr_data.txt index 122008bccdcfc6f0b5e122bb4fc71123207999b4..d4df0634e0cc4d3b331db907f1754716d30557d6 100644 --- a/lr_data.txt +++ b/lr_data.txt @@ -1,4 +1,4 @@ -1 2.1419053154730547 1.919407948982788 0.0501333631091041 -0.10699028639933772 1.2809776380727795 1.6846227956326554 0.18277859260127316 -0.39664340267804343 0.8090554869291249 2.48621339239065 +1 2.1419053154730548 1.919407948982788 0.0501333631091041 -0.10699028639933772 1.2809776380727795 1.6846227956326554 0.18277859260127316 -0.39664340267804343 0.8090554869291249 2.48621339239065 1 1.8023071496873626 0.8784870753345065 2.4105062239438624 0.3597672177864262 -0.20964445925329134 1.3537576978720287 0.5096503508009924 1.5507215382743629 -0.20355100196508347 1.3210160806416416 1 2.5511476388671834 1.438530286247105 1.481598060824539 2.519631078968068 0.7231682708126751 0.9160610215051366 2.255833005788796 0.6747272061334229 0.8267096669389163 -0.8585851445864527 1 2.4238069456328435 -0.3637260240750231 -0.964666098753878 0.08140515606581078 -1.5488873933848062 -0.6309606578419305 0.8779952253801084 2.289159071801577 0.7308611443440066 1.257491408509089