Skip to content
Snippets Groups Projects
Commit ae649203 authored by Richard Benkovsky's avatar Richard Benkovsky
Browse files

MesosScheduler refactoring

parent 3a1bcd40
No related branches found
No related tags found
No related merge requests found
...@@ -42,7 +42,7 @@ private class MesosScheduler( ...@@ -42,7 +42,7 @@ private class MesosScheduler(
// Memory used by each executor (in megabytes) // Memory used by each executor (in megabytes)
val EXECUTOR_MEMORY = { val EXECUTOR_MEMORY = {
if (System.getenv("SPARK_MEM") != null) { if (System.getenv("SPARK_MEM") != null) {
memoryStringToMb(System.getenv("SPARK_MEM")) MesosScheduler.memoryStringToMb(System.getenv("SPARK_MEM"))
// TODO: Might need to add some extra memory for the non-heap parts of the JVM // TODO: Might need to add some extra memory for the non-heap parts of the JVM
} else { } else {
512 512
...@@ -78,9 +78,7 @@ private class MesosScheduler( ...@@ -78,9 +78,7 @@ private class MesosScheduler(
// Sorts jobs in reverse order of run ID for use in our priority queue (so lower IDs run first) // Sorts jobs in reverse order of run ID for use in our priority queue (so lower IDs run first)
private val jobOrdering = new Ordering[Job] { private val jobOrdering = new Ordering[Job] {
override def compare(j1: Job, j2: Job): Int = { override def compare(j1: Job, j2: Job): Int = j2.runId - j1.runId
return j2.runId - j1.runId
}
} }
def newJobId(): Int = this.synchronized { def newJobId(): Int = this.synchronized {
...@@ -156,7 +154,7 @@ private class MesosScheduler( ...@@ -156,7 +154,7 @@ private class MesosScheduler(
activeJobs(jobId) = myJob activeJobs(jobId) = myJob
activeJobsQueue += myJob activeJobsQueue += myJob
logInfo("Adding job with ID " + jobId) logInfo("Adding job with ID " + jobId)
jobTasks(jobId) = new HashSet() jobTasks(jobId) = HashSet.empty[String]
} }
driver.reviveOffers(); driver.reviveOffers();
} }
...@@ -376,24 +374,27 @@ private class MesosScheduler( ...@@ -376,24 +374,27 @@ private class MesosScheduler(
} }
override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
}
object MesosScheduler {
/** /**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
* This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
* environment variable. * environment variable.
*/ */
def memoryStringToMb(str: String): Int = { def memoryStringToMb(str: String): Int = {
val lower = str.toLowerCase val lower = str.toLowerCase
if (lower.endsWith("k")) { if (lower.endsWith("k")) {
(lower.substring(0, lower.length-1).toLong / 1024).toInt (lower.substring(0, lower.length - 1).toLong / 1024).toInt
} else if (lower.endsWith("m")) { } else if (lower.endsWith("m")) {
lower.substring(0, lower.length-1).toInt lower.substring(0, lower.length - 1).toInt
} else if (lower.endsWith("g")) { } else if (lower.endsWith("g")) {
lower.substring(0, lower.length-1).toInt * 1024 lower.substring(0, lower.length - 1).toInt * 1024
} else if (lower.endsWith("t")) { } else if (lower.endsWith("t")) {
lower.substring(0, lower.length-1).toInt * 1024 * 1024 lower.substring(0, lower.length - 1).toInt * 1024 * 1024
} else {// no suffix, so it's just a number in bytes } else {
// no suffix, so it's just a number in bytes
(lower.toLong / 1024 / 1024).toInt (lower.toLong / 1024 / 1024).toInt
} }
} }
} }
\ No newline at end of file
package spark
import org.scalatest.FunSuite
class MesosSchedulerSuite extends FunSuite {
test("memoryStringToMb"){
assert(MesosScheduler.memoryStringToMb("1") == 0)
assert(MesosScheduler.memoryStringToMb("1048575") == 0)
assert(MesosScheduler.memoryStringToMb("3145728") == 3)
assert(MesosScheduler.memoryStringToMb("1024k") == 1)
assert(MesosScheduler.memoryStringToMb("5000k") == 4)
assert(MesosScheduler.memoryStringToMb("4024k") == MesosScheduler.memoryStringToMb("4024K"))
assert(MesosScheduler.memoryStringToMb("1024m") == 1024)
assert(MesosScheduler.memoryStringToMb("5000m") == 5000)
assert(MesosScheduler.memoryStringToMb("4024m") == MesosScheduler.memoryStringToMb("4024M"))
assert(MesosScheduler.memoryStringToMb("2g") == 2048)
assert(MesosScheduler.memoryStringToMb("3g") == MesosScheduler.memoryStringToMb("3G"))
assert(MesosScheduler.memoryStringToMb("2t") == 2097152)
assert(MesosScheduler.memoryStringToMb("3t") == MesosScheduler.memoryStringToMb("3T"))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment