diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 1e295aaa48c3082792fba0d798cdd8c3a79d9325..54e3937edde6bca625568cd2318c7ad8f612f8c3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -41,8 +41,8 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
   if (System.getenv("SPARK_WORKER_CORES") != null) {
     cores = System.getenv("SPARK_WORKER_CORES").toInt
   }
-  if (System.getenv("SPARK_WORKER_MEMORY") != null) {
-    memory = Utils.memoryStringToMb(System.getenv("SPARK_WORKER_MEMORY"))
+  if (conf.getenv("SPARK_WORKER_MEMORY") != null) {
+    memory = Utils.memoryStringToMb(conf.getenv("SPARK_WORKER_MEMORY"))
   }
   if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
     webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
@@ -56,6 +56,8 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
 
   parse(args.toList)
 
+  checkWorkerMemory()
+
   def parse(args: List[String]): Unit = args match {
     case ("--ip" | "-i") :: value :: tail =>
       Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
@@ -153,4 +155,11 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
     // Leave out 1 GB for the operating system, but don't return a negative memory size
     math.max(totalMb - 1024, 512)
   }
+
+  def checkWorkerMemory(): Unit = {
+    if (memory <= 0) {
+      val message = "Memory can't be 0, missing a M or G on the end of the memory specification?"
+      throw new IllegalStateException(message)
+    }
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
new file mode 100644
index 0000000000000000000000000000000000000000..1a28a9a187cd75ca867fac276affa1da213cf00b
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.deploy.worker
+
+import org.apache.spark.SparkConf
+import org.scalatest.FunSuite
+
+
+class WorkerArgumentsTest extends FunSuite {
+
+  test("Memory can't be set to 0 when cmd line args leave off M or G") {
+    val conf = new SparkConf
+    val args = Array("-m", "10000", "spark://localhost:0000  ")
+    intercept[IllegalStateException] {
+      new WorkerArguments(args, conf)
+    }
+  }
+
+
+  test("Memory can't be set to 0 when SPARK_WORKER_MEMORY env property leaves off M or G") {
+    val args = Array("spark://localhost:0000  ")
+
+    class MySparkConf extends SparkConf(false) {
+      override def getenv(name: String) = {
+        if (name == "SPARK_WORKER_MEMORY") "50000"
+        else super.getenv(name)
+      }
+
+      override def clone: SparkConf = {
+        new MySparkConf().setAll(settings)
+      }
+    }
+    val conf = new MySparkConf()
+    intercept[IllegalStateException] {
+      new WorkerArguments(args, conf)
+    }
+  }
+
+  test("Memory correctly set when SPARK_WORKER_MEMORY env property appends G") {
+    val args = Array("spark://localhost:0000  ")
+
+    class MySparkConf extends SparkConf(false) {
+      override def getenv(name: String) = {
+        if (name == "SPARK_WORKER_MEMORY") "5G"
+        else super.getenv(name)
+      }
+
+      override def clone: SparkConf = {
+        new MySparkConf().setAll(settings)
+      }
+    }
+    val conf = new MySparkConf()
+    val workerArgs =  new WorkerArguments(args, conf)
+    assert(workerArgs.memory === 5120)
+  }
+
+  test("Memory correctly set from args with M appended to memory value") {
+    val conf = new SparkConf
+    val args = Array("-m", "10000M", "spark://localhost:0000  ")
+
+    val workerArgs = new WorkerArguments(args, conf)
+    assert(workerArgs.memory === 10000)
+
+  }
+
+}