diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 96f8aa93394f5b202f1a260ec4c9bd030171d7d6..32f8861dc950378fe63aa375ae5845e227d60117 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.net.URI
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{HashMap, ListBuffer}
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.yarn.api._
@@ -44,9 +44,9 @@ trait ExecutorRunnableUtil extends Logging {
       hostname: String,
       executorMemory: Int,
       executorCores: Int,
-      localResources: HashMap[String, LocalResource]) = {
+      localResources: HashMap[String, LocalResource]): List[String] = {
     // Extra options for the JVM
-    var JAVA_OPTS = ""
+    val JAVA_OPTS = ListBuffer[String]()
     // Set the JVM memory
     val executorMemoryString = executorMemory + "m"
     JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
@@ -56,10 +56,21 @@ trait ExecutorRunnableUtil extends Logging {
       JAVA_OPTS += opts
     }
 
-    JAVA_OPTS += " -Djava.io.tmpdir=" +
-      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+    JAVA_OPTS += "-Djava.io.tmpdir=" +
+      new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
     JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
 
+    // Certain configs need to be passed here because they are needed before the Executor
+    // registers with the Scheduler and transfers the spark configs. Since the Executor backend
+    // uses Akka to connect to the scheduler, the akka settings are needed as well as the
+    // authentication settings.
+    sparkConf.getAll.
+      filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
+      foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
+
+    sparkConf.getAkkaConf.
+      foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
+
     // Commenting it out for now - so that people can refer to the properties if required. Remove
     // it once cpuset version is pushed out.
     // The context is, default gc for server class machines end up using all cores to do gc - hence
@@ -85,25 +96,25 @@ trait ExecutorRunnableUtil extends Logging {
         }
     */
 
-    val commands = List[String](
-      Environment.JAVA_HOME.$() + "/bin/java" +
-      " -server " +
+    val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java",
+      "-server",
       // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
       // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
       // an inconsistent state.
       // TODO: If the OOM is not recoverable by rescheduling it on different node, then do
       // 'something' to fail job ... akin to blacklisting trackers in mapred ?
-      " -XX:OnOutOfMemoryError='kill %p' " +
-      JAVA_OPTS +
-      " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
-      masterAddress + " " +
-      slaveId + " " +
-      hostname + " " +
-      executorCores +
-      " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-      " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
-
-    commands
+      "-XX:OnOutOfMemoryError='kill %p'") ++
+      JAVA_OPTS ++
+      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
+      masterAddress.toString,
+      slaveId.toString,
+      hostname.toString,
+      executorCores.toString,
+      "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+      "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+    // TODO: it would be nicer to just make sure there are no null commands here
+    commands.map(s => if (s == null) "null" else s).toList
   }
 
   private def setupDistributedCache(