diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 0a0a5d772b6595f3f52a75569b1bcf50ca0a3883..1ef1712c5659736bdc2812e062992b40bb75759f 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -113,9 +113,20 @@ class SparkContext(
         scheduler.initialize(backend)
         scheduler
 
-      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) =>
+      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
+        // Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang.
+        val memoryPerSlaveInt = memoryPerSlave.toInt
+        val sparkMemEnv = System.getenv("SPARK_MEM")
+        val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512
+        if (sparkMemEnvInt > memoryPerSlaveInt) {
+          throw new SparkException(
+            "Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format(
+              memoryPerSlaveInt, sparkMemEnvInt))
+        }
+
         val scheduler = new ClusterScheduler(this)
-        val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt)
+        val localCluster = new LocalSparkCluster(
+          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
         val sparkUrl = localCluster.start()
         val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
         scheduler.initialize(backend)