diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index b95f40b877c8fc07d1ea7d6debab9bfa88d0e37a..755e001106e4a3120f0152b0dec9f6fad18d82e5 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -42,7 +42,7 @@ private class MesosScheduler( // Memory used by each executor (in megabytes) val EXECUTOR_MEMORY = { if (System.getenv("SPARK_MEM") != null) { - memoryStringToMb(System.getenv("SPARK_MEM")) + MesosScheduler.memoryStringToMb(System.getenv("SPARK_MEM")) // TODO: Might need to add some extra memory for the non-heap parts of the JVM } else { 512 @@ -78,9 +78,7 @@ private class MesosScheduler( // Sorts jobs in reverse order of run ID for use in our priority queue (so lower IDs run first) private val jobOrdering = new Ordering[Job] { - override def compare(j1: Job, j2: Job): Int = { - return j2.runId - j1.runId - } + override def compare(j1: Job, j2: Job): Int = j2.runId - j1.runId } def newJobId(): Int = this.synchronized { @@ -156,7 +154,7 @@ private class MesosScheduler( activeJobs(jobId) = myJob activeJobsQueue += myJob logInfo("Adding job with ID " + jobId) - jobTasks(jobId) = new HashSet() + jobTasks(jobId) = HashSet.empty[String] } driver.reviveOffers(); } @@ -376,24 +374,27 @@ private class MesosScheduler( } override def offerRescinded(d: SchedulerDriver, o: OfferID) {} +} +object MesosScheduler { /** - * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. - * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM + * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. + * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM * environment variable. */ def memoryStringToMb(str: String): Int = { val lower = str.toLowerCase if (lower.endsWith("k")) { - (lower.substring(0, lower.length-1).toLong / 1024).toInt + (lower.substring(0, lower.length - 1).toLong / 1024).toInt } else if (lower.endsWith("m")) { - lower.substring(0, lower.length-1).toInt + lower.substring(0, lower.length - 1).toInt } else if (lower.endsWith("g")) { - lower.substring(0, lower.length-1).toInt * 1024 + lower.substring(0, lower.length - 1).toInt * 1024 } else if (lower.endsWith("t")) { - lower.substring(0, lower.length-1).toInt * 1024 * 1024 - } else {// no suffix, so it's just a number in bytes + lower.substring(0, lower.length - 1).toInt * 1024 * 1024 + } else { + // no suffix, so it's just a number in bytes (lower.toLong / 1024 / 1024).toInt } } -} +} \ No newline at end of file diff --git a/core/src/test/scala/spark/MesosSchedulerSuite.scala b/core/src/test/scala/spark/MesosSchedulerSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..0e6820cbdcf31b0135d57283ef6b2b78681a5569 --- /dev/null +++ b/core/src/test/scala/spark/MesosSchedulerSuite.scala @@ -0,0 +1,28 @@ +package spark + +import org.scalatest.FunSuite + +class MesosSchedulerSuite extends FunSuite { + test("memoryStringToMb"){ + + assert(MesosScheduler.memoryStringToMb("1") == 0) + assert(MesosScheduler.memoryStringToMb("1048575") == 0) + assert(MesosScheduler.memoryStringToMb("3145728") == 3) + + assert(MesosScheduler.memoryStringToMb("1024k") == 1) + assert(MesosScheduler.memoryStringToMb("5000k") == 4) + assert(MesosScheduler.memoryStringToMb("4024k") == MesosScheduler.memoryStringToMb("4024K")) + + assert(MesosScheduler.memoryStringToMb("1024m") == 1024) + assert(MesosScheduler.memoryStringToMb("5000m") == 5000) + assert(MesosScheduler.memoryStringToMb("4024m") == MesosScheduler.memoryStringToMb("4024M")) + + assert(MesosScheduler.memoryStringToMb("2g") == 2048) + assert(MesosScheduler.memoryStringToMb("3g") == MesosScheduler.memoryStringToMb("3G")) + + assert(MesosScheduler.memoryStringToMb("2t") == 2097152) + assert(MesosScheduler.memoryStringToMb("3t") == MesosScheduler.memoryStringToMb("3T")) + + + } +}