diff --git a/src/scala/spark/HdfsFile.scala b/src/scala/spark/HdfsFile.scala index 1c007c679aa705cd7b8f7d0271aa0007e2d0c47c..595386fceb07a179fc7149ac43793ef2cff75017 100644 --- a/src/scala/spark/HdfsFile.scala +++ b/src/scala/spark/HdfsFile.scala @@ -25,7 +25,7 @@ extends RDD[String](sc) { ConfigureLock.synchronized { inputFormat.configure(conf) } @transient val splits_ = - inputFormat.getSplits(conf, 2).map(new HdfsSplit(_)).toArray + inputFormat.getSplits(conf, sc.scheduler.numCores).map(new HdfsSplit(_)).toArray override def splits = splits_.asInstanceOf[Array[Split]] diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala index b8533e0bd9a19f066f643e989fc913ff6fa84818..7f82c4934836796102cf291fb2a7946d00efdba9 100644 --- a/src/scala/spark/MesosScheduler.scala +++ b/src/scala/spark/MesosScheduler.scala @@ -3,6 +3,7 @@ package spark import java.io.File import scala.collection.mutable.Map +import scala.collection.JavaConversions._ import mesos.{Scheduler => NScheduler} import mesos._ @@ -105,10 +106,20 @@ extends NScheduler with spark.Scheduler val tasks = new java.util.ArrayList[TaskDescription] if (activeOp != null) { try { - for (i <- 0 until offers.size.toInt) { - activeOp.slaveOffer(offers.get(i)) match { - case Some(task) => tasks.add(task) - case None => {} + val availableCpus = offers.map(_.getParams.get("cpus").toInt) + val availableMem = offers.map(_.getParams.get("mem").toInt) + var resourcesAvailable = true + while (resourcesAvailable) { + resourcesAvailable = false + for (i <- 0 until offers.size.toInt) { + activeOp.slaveOffer(offers.get(i), availableCpus(i), availableMem(i)) match { + case Some(task) => + tasks.add(task) + availableCpus(i) -= task.getParams.get("cpus").toInt + availableMem(i) -= task.getParams.get("mem").toInt + resourcesAvailable = resourcesAvailable || true + case None => {} + } } } } catch { @@ -162,7 +173,7 @@ extends NScheduler with spark.Scheduler // Trait representing an object that manages a parallel operation by // implementing various scheduler callbacks. trait ParallelOperation { - def slaveOffer(s: SlaveOffer): Option[TaskDescription] + def slaveOffer(s: SlaveOffer, availableCpus: Int, availableMem: Int): Option[TaskDescription] def statusUpdate(t: TaskStatus): Unit def error(code: Int, message: String): Unit } @@ -207,7 +218,7 @@ extends ParallelOperation } } - def slaveOffer(offer: SlaveOffer): Option[TaskDescription] = { + def slaveOffer(offer: SlaveOffer, availableCpus: Int, availableMem: Int): Option[TaskDescription] = { if (tasksLaunched < numTasks) { var checkPrefVals: Array[Boolean] = Array(true) val time = System.currentTimeMillis @@ -215,9 +226,8 @@ extends ParallelOperation checkPrefVals = Array(true, false) // Allow non-preferred tasks // TODO: Make desiredCpus and desiredMem configurable val desiredCpus = 1 - val desiredMem = 750 - if (offer.getParams.get("cpus").toInt < desiredCpus || - offer.getParams.get("mem").toInt < desiredMem) + val desiredMem = 500 + if ((availableCpus < desiredCpus) || (availableMem < desiredMem)) return None for (checkPref <- checkPrefVals; i <- 0 until numTasks) { if (!launched(i) && (!checkPref || @@ -264,7 +274,7 @@ extends ParallelOperation def taskFinished(status: TaskStatus) { val tid = status.getTaskId - println("Finished TID " + tid) + print("Finished TID " + tid) if (!finished(tidToIndex(tid))) { // Deserialize task result val result = Utils.deserialize[TaskResult[T]](status.getData) @@ -274,10 +284,12 @@ extends ParallelOperation // Mark finished and stop if we've finished all the tasks finished(tidToIndex(tid)) = true tasksFinished += 1 + + println(", finished " + tasksFinished + "/" + numTasks) if (tasksFinished == numTasks) setAllFinished() } else { - printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid)) + printf("... Task %s had already finished, so ignoring it\n", tidToIndex(tid)) } } diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala index b1ddd80f878b83d3509a4da76512c6fa5a28eedf..1188367bdd00a00446e1c468769f0d4f787c267f 100644 --- a/src/scala/spark/SparkContext.scala +++ b/src/scala/spark/SparkContext.scala @@ -25,7 +25,7 @@ class SparkContext(master: String, frameworkName: String) { val LOCAL_REGEX = """local\[([0-9]+)\]""".r - private var scheduler: Scheduler = master match { + private[spark] var scheduler: Scheduler = master match { case "local" => new LocalScheduler(1) case LOCAL_REGEX(threads) => new LocalScheduler(threads.toInt) case _ => { System.loadLibrary("mesos");