Skip to content
Snippets Groups Projects
Commit 3bfaf3ab authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #379 from stephenh/sparkmem

Add spark.executor.memory to differentiate executor memory from spark-shell
parents 88ee6163 cae8a679
No related branches found
No related tags found
No related merge requests found
...@@ -108,8 +108,9 @@ class SparkContext( ...@@ -108,8 +108,9 @@ class SparkContext(
// Environment variables to pass to our executors // Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]() private[spark] val executorEnvs = HashMap[String, String]()
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS",
"SPARK_TESTING")) { "SPARK_TESTING")) {
val value = System.getenv(key) val value = System.getenv(key)
if (value != null) { if (value != null) {
executorEnvs(key) = value executorEnvs(key) = value
......
...@@ -113,8 +113,7 @@ private[spark] class ExecutorRunner( ...@@ -113,8 +113,7 @@ private[spark] class ExecutorRunner(
for ((key, value) <- jobDesc.command.environment) { for ((key, value) <- jobDesc.command.environment) {
env.put(key, value) env.put(key, value)
} }
env.put("SPARK_CORES", cores.toString) env.put("SPARK_MEM", memory.toString + "m")
env.put("SPARK_MEMORY", memory.toString)
// In case we are running this from within the Spark Shell, avoid creating a "scala" // In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command // parent process for the executor command
env.put("SPARK_LAUNCH_WITH_SCALA", "0") env.put("SPARK_LAUNCH_WITH_SCALA", "0")
......
package spark.scheduler.cluster package spark.scheduler.cluster
import spark.Utils
/** /**
* A backend interface for cluster scheduling systems that allows plugging in different ones under * A backend interface for cluster scheduling systems that allows plugging in different ones under
* ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
...@@ -11,5 +13,15 @@ private[spark] trait SchedulerBackend { ...@@ -11,5 +13,15 @@ private[spark] trait SchedulerBackend {
def reviveOffers(): Unit def reviveOffers(): Unit
def defaultParallelism(): Int def defaultParallelism(): Int
// Memory used by each executor (in megabytes)
protected val executorMemory = {
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
Option(System.getProperty("spark.executor.memory"))
.orElse(Option(System.getenv("SPARK_MEM")))
.map(Utils.memoryStringToMb)
.getOrElse(512)
}
// TODO: Probably want to add a killTask too // TODO: Probably want to add a killTask too
} }
...@@ -20,16 +20,6 @@ private[spark] class SparkDeploySchedulerBackend( ...@@ -20,16 +20,6 @@ private[spark] class SparkDeploySchedulerBackend(
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
// Memory used by each executor (in megabytes)
val executorMemory = {
if (System.getenv("SPARK_MEM") != null) {
Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
} else {
512
}
}
override def start() { override def start() {
super.start() super.start()
......
...@@ -35,16 +35,6 @@ private[spark] class CoarseMesosSchedulerBackend( ...@@ -35,16 +35,6 @@ private[spark] class CoarseMesosSchedulerBackend(
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
// Memory used by each executor (in megabytes)
val executorMemory = {
if (System.getenv("SPARK_MEM") != null) {
Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
} else {
512
}
}
// Lock used to wait for scheduler to be registered // Lock used to wait for scheduler to be registered
var isRegistered = false var isRegistered = false
val registeredLock = new Object() val registeredLock = new Object()
......
...@@ -29,16 +29,6 @@ private[spark] class MesosSchedulerBackend( ...@@ -29,16 +29,6 @@ private[spark] class MesosSchedulerBackend(
with MScheduler with MScheduler
with Logging { with Logging {
// Memory used by each executor (in megabytes)
val EXECUTOR_MEMORY = {
if (System.getenv("SPARK_MEM") != null) {
Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
} else {
512
}
}
// Lock used to wait for scheduler to be registered // Lock used to wait for scheduler to be registered
var isRegistered = false var isRegistered = false
val registeredLock = new Object() val registeredLock = new Object()
...@@ -89,7 +79,7 @@ private[spark] class MesosSchedulerBackend( ...@@ -89,7 +79,7 @@ private[spark] class MesosSchedulerBackend(
val memory = Resource.newBuilder() val memory = Resource.newBuilder()
.setName("mem") .setName("mem")
.setType(Value.Type.SCALAR) .setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build()) .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
.build() .build()
val command = CommandInfo.newBuilder() val command = CommandInfo.newBuilder()
.setValue(execScript) .setValue(execScript)
...@@ -161,7 +151,7 @@ private[spark] class MesosSchedulerBackend( ...@@ -161,7 +151,7 @@ private[spark] class MesosSchedulerBackend(
def enoughMemory(o: Offer) = { def enoughMemory(o: Offer) = {
val mem = getResource(o.getResourcesList, "mem") val mem = getResource(o.getResourcesList, "mem")
val slaveId = o.getSlaveId.getValue val slaveId = o.getSlaveId.getValue
mem >= EXECUTOR_MEMORY || slaveIdsWithExecutors.contains(slaveId) mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
} }
for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment