From bcfd55fa982b24184c07fcd4ccdd55dcf6465bf4 Mon Sep 17 00:00:00 2001
From: Daniel Jalova <djalova@us.ibm.com>
Date: Wed, 24 Feb 2016 12:15:11 +0000
Subject: [PATCH] [SPARK-12759][Core][Spark should fail fast if
 --executor-memory is too small for spark to start]

Added an exception to be thrown in UnifiedMemoryManager.scala if the configuration given for executor memory is too low. Also modified the exception message thrown when driver memory is too low.

This patch was tested manually by passing in config options to Spark shell. I also added a test in UnifiedMemoryManagerSuite.scala

Author: Daniel Jalova <djalova@us.ibm.com>

Closes #11255 from djalova/SPARK-12759.
---
 .../spark/memory/UnifiedMemoryManager.scala   | 12 ++++++++++-
 .../memory/UnifiedMemoryManagerSuite.scala    | 20 ++++++++++++++++++-
 2 files changed, 30 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index a3321e3f17..6c57c98ea5 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -183,7 +183,17 @@ object UnifiedMemoryManager {
     val minSystemMemory = reservedMemory * 1.5
     if (systemMemory < minSystemMemory) {
       throw new IllegalArgumentException(s"System memory $systemMemory must " +
-        s"be at least $minSystemMemory. Please use a larger heap size.")
+        s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
+        s"option or spark.driver.memory in Spark configuration.")
+    }
+    // SPARK-12759 Check executor memory to fail fast if memory is insufficient
+    if (conf.contains("spark.executor.memory")) {
+      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
+      if (executorMemory < minSystemMemory) {
+        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
+          s"$minSystemMemory. Please increase executor memory using the " +
+          s"--executor-memory option or spark.executor.memory in Spark configuration.")
+      }
     }
     val usableMemory = systemMemory - reservedMemory
     val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 0c4359c3c2..9686c6621b 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -227,7 +227,25 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     val exception = intercept[IllegalArgumentException] {
       UnifiedMemoryManager(conf2, numCores = 1)
     }
-    assert(exception.getMessage.contains("larger heap size"))
+    assert(exception.getMessage.contains("increase heap size"))
+  }
+
+  test("insufficient executor memory") {
+    val systemMemory = 1024 * 1024
+    val reservedMemory = 300 * 1024
+    val memoryFraction = 0.8
+    val conf = new SparkConf()
+      .set("spark.memory.fraction", memoryFraction.toString)
+      .set("spark.testing.memory", systemMemory.toString)
+      .set("spark.testing.reservedMemory", reservedMemory.toString)
+    val mm = UnifiedMemoryManager(conf, numCores = 1)
+
+    // Try using an executor memory that's too small
+    val conf2 = conf.clone().set("spark.executor.memory", (reservedMemory / 2).toString)
+    val exception = intercept[IllegalArgumentException] {
+      UnifiedMemoryManager(conf2, numCores = 1)
+    }
+    assert(exception.getMessage.contains("increase executor memory"))
   }
 
   test("execution can evict cached blocks when there are multiple active tasks (SPARK-12155)") {
-- 
GitLab