Skip to content
Snippets Groups Projects
Commit bcfd55fa authored by Daniel Jalova's avatar Daniel Jalova Committed by Sean Owen
Browse files

[SPARK-12759][Core][Spark should fail fast if --executor-memory is too small for spark to start]

Added an exception to be thrown in UnifiedMemoryManager.scala if the configuration given for executor memory is too low. Also modified the exception message thrown when driver memory is too low.

This patch was tested manually by passing in config options to Spark shell. I also added a test in UnifiedMemoryManagerSuite.scala

Author: Daniel Jalova <djalova@us.ibm.com>

Closes #11255 from djalova/SPARK-12759.
parent 83142917
No related branches found
No related tags found
No related merge requests found
......@@ -183,7 +183,17 @@ object UnifiedMemoryManager {
val minSystemMemory = reservedMemory * 1.5
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please use a larger heap size.")
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
// SPARK-12759 Check executor memory to fail fast if memory is insufficient
if (conf.contains("spark.executor.memory")) {
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
......
......@@ -227,7 +227,25 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val exception = intercept[IllegalArgumentException] {
UnifiedMemoryManager(conf2, numCores = 1)
}
assert(exception.getMessage.contains("larger heap size"))
assert(exception.getMessage.contains("increase heap size"))
}
test("insufficient executor memory") {
val systemMemory = 1024 * 1024
val reservedMemory = 300 * 1024
val memoryFraction = 0.8
val conf = new SparkConf()
.set("spark.memory.fraction", memoryFraction.toString)
.set("spark.testing.memory", systemMemory.toString)
.set("spark.testing.reservedMemory", reservedMemory.toString)
val mm = UnifiedMemoryManager(conf, numCores = 1)
// Try using an executor memory that's too small
val conf2 = conf.clone().set("spark.executor.memory", (reservedMemory / 2).toString)
val exception = intercept[IllegalArgumentException] {
UnifiedMemoryManager(conf2, numCores = 1)
}
assert(exception.getMessage.contains("increase executor memory"))
}
test("execution can evict cached blocks when there are multiple active tasks (SPARK-12155)") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment