Skip to content
Snippets Groups Projects
Commit 696eec32 authored by Stephen Haberman's avatar Stephen Haberman
Browse files

Move executorMemory up into SchedulerBackend.

parent 103c375b
No related branches found
No related tags found
No related merge requests found
package spark.scheduler.cluster package spark.scheduler.cluster
import spark.Utils
/** /**
* A backend interface for cluster scheduling systems that allows plugging in different ones under * A backend interface for cluster scheduling systems that allows plugging in different ones under
* ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
...@@ -11,5 +13,15 @@ private[spark] trait SchedulerBackend { ...@@ -11,5 +13,15 @@ private[spark] trait SchedulerBackend {
def reviveOffers(): Unit def reviveOffers(): Unit
def defaultParallelism(): Int def defaultParallelism(): Int
// Memory used by each executor (in megabytes)
protected val executorMemory = {
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
Option(System.getProperty("spark.executor.memory"))
.orElse(Option(System.getenv("SPARK_MEM")))
.map(Utils.memoryStringToMb)
.getOrElse(512)
}
// TODO: Probably want to add a killTask too // TODO: Probably want to add a killTask too
} }
...@@ -20,15 +20,6 @@ private[spark] class SparkDeploySchedulerBackend( ...@@ -20,15 +20,6 @@ private[spark] class SparkDeploySchedulerBackend(
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
// Memory used by each executor (in megabytes)
val executorMemory = {
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
Option(System.getProperty("spark.executor.memory"))
.orElse(Option(System.getenv("SPARK_MEM")))
.map(Utils.memoryStringToMb)
.getOrElse(512)
}
override def start() { override def start() {
super.start() super.start()
......
...@@ -35,16 +35,6 @@ private[spark] class CoarseMesosSchedulerBackend( ...@@ -35,16 +35,6 @@ private[spark] class CoarseMesosSchedulerBackend(
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
// Memory used by each executor (in megabytes)
val executorMemory = {
if (System.getenv("SPARK_MEM") != null) {
Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
} else {
512
}
}
// Lock used to wait for scheduler to be registered // Lock used to wait for scheduler to be registered
var isRegistered = false var isRegistered = false
val registeredLock = new Object() val registeredLock = new Object()
......
...@@ -29,16 +29,6 @@ private[spark] class MesosSchedulerBackend( ...@@ -29,16 +29,6 @@ private[spark] class MesosSchedulerBackend(
with MScheduler with MScheduler
with Logging { with Logging {
// Memory used by each executor (in megabytes)
val EXECUTOR_MEMORY = {
if (System.getenv("SPARK_MEM") != null) {
Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
} else {
512
}
}
// Lock used to wait for scheduler to be registered // Lock used to wait for scheduler to be registered
var isRegistered = false var isRegistered = false
val registeredLock = new Object() val registeredLock = new Object()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment