diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index a3321e3f179f6fcd46679bd27973254798bd86c2..6c57c98ea5c597404ed88ec2753a1ba8291799c1 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -183,7 +183,17 @@ object UnifiedMemoryManager { val minSystemMemory = reservedMemory * 1.5 if (systemMemory < minSystemMemory) { throw new IllegalArgumentException(s"System memory $systemMemory must " + - s"be at least $minSystemMemory. Please use a larger heap size.") + s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " + + s"option or spark.driver.memory in Spark configuration.") + } + // SPARK-12759 Check executor memory to fail fast if memory is insufficient + if (conf.contains("spark.executor.memory")) { + val executorMemory = conf.getSizeAsBytes("spark.executor.memory") + if (executorMemory < minSystemMemory) { + throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " + + s"$minSystemMemory. Please increase executor memory using the " + + s"--executor-memory option or spark.executor.memory in Spark configuration.") + } } val usableMemory = systemMemory - reservedMemory val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 0c4359c3c2cd509a1b17cda0db70594d080b9014..9686c6621b465d511b963c6b6a62f2993d047db9 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -227,7 +227,25 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val exception = intercept[IllegalArgumentException] { UnifiedMemoryManager(conf2, numCores = 1) } - assert(exception.getMessage.contains("larger heap size")) + assert(exception.getMessage.contains("increase heap size")) + } + + test("insufficient executor memory") { + val systemMemory = 1024 * 1024 + val reservedMemory = 300 * 1024 + val memoryFraction = 0.8 + val conf = new SparkConf() + .set("spark.memory.fraction", memoryFraction.toString) + .set("spark.testing.memory", systemMemory.toString) + .set("spark.testing.reservedMemory", reservedMemory.toString) + val mm = UnifiedMemoryManager(conf, numCores = 1) + + // Try using an executor memory that's too small + val conf2 = conf.clone().set("spark.executor.memory", (reservedMemory / 2).toString) + val exception = intercept[IllegalArgumentException] { + UnifiedMemoryManager(conf2, numCores = 1) + } + assert(exception.getMessage.contains("increase executor memory")) } test("execution can evict cached blocks when there are multiple active tasks (SPARK-12155)") {