diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala index 6a66d9deb8c04752b0f320f53524d084dd0b0d2a..640766ac7aac672cd19b03a6c1089dcf1832439a 100644 --- a/core/src/main/scala/spark/LocalScheduler.scala +++ b/core/src/main/scala/spark/LocalScheduler.scala @@ -36,12 +36,12 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule SparkEnv.set(env) try { // Serialize and deserialize the task so that accumulators are changed to thread-local ones; - // this adds a bit of unnecessary overhead but matches how the Mesos Executor works + // this adds a bit of unnecessary overhead but matches how the Mesos Executor works. Accumulators.clear val bytes = Utils.serialize(task) logInfo("Size of task " + idInJob + " is " + bytes.size + " bytes") val deserializedTask = Utils.deserialize[Task[_]]( - bytes, Thread.currentThread.getContextClassLoader) + bytes, Thread.currentThread.getContextClassLoader) val result: Any = deserializedTask.run(attemptId) val accumUpdates = Accumulators.values logInfo("Finished task " + idInJob) diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index 3854915852d07cc836623fb3f6995812aa55317f..5de6b10155f071385374e79ba4dc67c0beef7f46 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -95,8 +95,7 @@ private class MesosScheduler( setDaemon(true) override def run { val sched = MesosScheduler.this - driver = new MesosSchedulerDriver( - sched, frameworkName, getExecutorInfo, master) + driver = new MesosSchedulerDriver(sched, frameworkName, getExecutorInfo, master) try { val ret = driver.run() logInfo("driver.run() returned with code " + ret) @@ -111,9 +110,8 @@ private class MesosScheduler( val sparkHome = sc.getSparkHome match { case Some(path) => path case None => - throw new SparkException("Spark home is not set; set it through the " + - "spark.home system property, the SPARK_HOME environment variable " + - "or the SparkContext constructor") + throw new SparkException("Spark home is not set; set it through the spark.home system " + + "property, the SPARK_HOME environment variable or the SparkContext constructor") } val execScript = new File(sparkHome, "spark-executor").getCanonicalPath val params = Params.newBuilder() @@ -362,9 +360,9 @@ private class MesosScheduler( override def offerRescinded(d: SchedulerDriver, o: OfferID) {} /** - * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a - * number of megabytes. This is used to figure out how much memory to claim - * from Mesos based on the SPARK_MEM environment variable. + * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. + * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM + * environment variable. */ def memoryStringToMb(str: String): Int = { val lower = str.toLowerCase diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 5ba3d1b7ea5c8459bcccb46c2e8302ae25f6abf0..c5e0b1585b573f08f470f1a7f5beebe67be66080 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -108,11 +108,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial if (num > initialCount) { total = maxSelected - fraction = Math.min(multiplier*(maxSelected+1)/initialCount, 1.0) + fraction = Math.min(multiplier * (maxSelected + 1) / initialCount, 1.0) } else if (num < 0) { throw(new IllegalArgumentException("Negative number of elements requested")) } else { - fraction = Math.min(multiplier*(num+1)/initialCount, 1.0) + fraction = Math.min(multiplier * (num + 1) / initialCount, 1.0) total = num.toInt } @@ -202,9 +202,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial * allowed to modify and return their first argument instead of creating a new U to avoid memory * allocation. */ - def aggregate[U: ClassManifest](zeroValue: U)( - seqOp: (U, T) => U, - combOp: (U, U) => U): U = { + def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val results = sc.runJob(this, @@ -288,8 +286,7 @@ class FilteredRDD[T: ClassManifest]( ) extends RDD[T](prev.context) { override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split) = - prev.iterator(split).filter(f) + override def compute(split: Split) = prev.iterator(split).filter(f) } class GlommedRDD[T: ClassManifest]( diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 1d77c357c3f594dc2514ff65c074001dc286c0e0..89a3b1c1f9a9ce911b981da5a4248bcc4093189a 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -35,7 +35,7 @@ object Utils { return ois.readObject.asInstanceOf[T] } - def isAlpha(c: Char) = { + def isAlpha(c: Char): Boolean = { (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') }