Skip to content
Snippets Groups Projects
Commit 445e0bb1 authored by haoyuan's avatar haoyuan
Browse files

Format the code a bit mroe.

parent 651932e7
No related branches found
No related tags found
No related merge requests found
...@@ -36,12 +36,12 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule ...@@ -36,12 +36,12 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule
SparkEnv.set(env) SparkEnv.set(env)
try { try {
// Serialize and deserialize the task so that accumulators are changed to thread-local ones; // Serialize and deserialize the task so that accumulators are changed to thread-local ones;
// this adds a bit of unnecessary overhead but matches how the Mesos Executor works // this adds a bit of unnecessary overhead but matches how the Mesos Executor works.
Accumulators.clear Accumulators.clear
val bytes = Utils.serialize(task) val bytes = Utils.serialize(task)
logInfo("Size of task " + idInJob + " is " + bytes.size + " bytes") logInfo("Size of task " + idInJob + " is " + bytes.size + " bytes")
val deserializedTask = Utils.deserialize[Task[_]]( val deserializedTask = Utils.deserialize[Task[_]](
bytes, Thread.currentThread.getContextClassLoader) bytes, Thread.currentThread.getContextClassLoader)
val result: Any = deserializedTask.run(attemptId) val result: Any = deserializedTask.run(attemptId)
val accumUpdates = Accumulators.values val accumUpdates = Accumulators.values
logInfo("Finished task " + idInJob) logInfo("Finished task " + idInJob)
......
...@@ -95,8 +95,7 @@ private class MesosScheduler( ...@@ -95,8 +95,7 @@ private class MesosScheduler(
setDaemon(true) setDaemon(true)
override def run { override def run {
val sched = MesosScheduler.this val sched = MesosScheduler.this
driver = new MesosSchedulerDriver( driver = new MesosSchedulerDriver(sched, frameworkName, getExecutorInfo, master)
sched, frameworkName, getExecutorInfo, master)
try { try {
val ret = driver.run() val ret = driver.run()
logInfo("driver.run() returned with code " + ret) logInfo("driver.run() returned with code " + ret)
...@@ -111,9 +110,8 @@ private class MesosScheduler( ...@@ -111,9 +110,8 @@ private class MesosScheduler(
val sparkHome = sc.getSparkHome match { val sparkHome = sc.getSparkHome match {
case Some(path) => path case Some(path) => path
case None => case None =>
throw new SparkException("Spark home is not set; set it through the " + throw new SparkException("Spark home is not set; set it through the spark.home system " +
"spark.home system property, the SPARK_HOME environment variable " + "property, the SPARK_HOME environment variable or the SparkContext constructor")
"or the SparkContext constructor")
} }
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
val params = Params.newBuilder() val params = Params.newBuilder()
...@@ -362,9 +360,9 @@ private class MesosScheduler( ...@@ -362,9 +360,9 @@ private class MesosScheduler(
override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
/** /**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
* number of megabytes. This is used to figure out how much memory to claim * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
* from Mesos based on the SPARK_MEM environment variable. * environment variable.
*/ */
def memoryStringToMb(str: String): Int = { def memoryStringToMb(str: String): Int = {
val lower = str.toLowerCase val lower = str.toLowerCase
......
...@@ -108,11 +108,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial ...@@ -108,11 +108,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
if (num > initialCount) { if (num > initialCount) {
total = maxSelected total = maxSelected
fraction = Math.min(multiplier*(maxSelected+1)/initialCount, 1.0) fraction = Math.min(multiplier * (maxSelected + 1) / initialCount, 1.0)
} else if (num < 0) { } else if (num < 0) {
throw(new IllegalArgumentException("Negative number of elements requested")) throw(new IllegalArgumentException("Negative number of elements requested"))
} else { } else {
fraction = Math.min(multiplier*(num+1)/initialCount, 1.0) fraction = Math.min(multiplier * (num + 1) / initialCount, 1.0)
total = num.toInt total = num.toInt
} }
...@@ -202,9 +202,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial ...@@ -202,9 +202,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
* allowed to modify and return their first argument instead of creating a new U to avoid memory * allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation. * allocation.
*/ */
def aggregate[U: ClassManifest](zeroValue: U)( def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
seqOp: (U, T) => U,
combOp: (U, U) => U): U = {
val cleanSeqOp = sc.clean(seqOp) val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp) val cleanCombOp = sc.clean(combOp)
val results = sc.runJob(this, val results = sc.runJob(this,
...@@ -288,8 +286,7 @@ class FilteredRDD[T: ClassManifest]( ...@@ -288,8 +286,7 @@ class FilteredRDD[T: ClassManifest](
) extends RDD[T](prev.context) { ) extends RDD[T](prev.context) {
override def splits = prev.splits override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev)) override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = override def compute(split: Split) = prev.iterator(split).filter(f)
prev.iterator(split).filter(f)
} }
class GlommedRDD[T: ClassManifest]( class GlommedRDD[T: ClassManifest](
......
...@@ -35,7 +35,7 @@ object Utils { ...@@ -35,7 +35,7 @@ object Utils {
return ois.readObject.asInstanceOf[T] return ois.readObject.asInstanceOf[T]
} }
def isAlpha(c: Char) = { def isAlpha(c: Char): Boolean = {
(c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment