diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 58aa6d951a2047c5ed87d8f6d2817926403a1eef..24edc60684376cb2e0cfbe9c652ceb4d7aab00b7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -185,7 +185,6 @@ object SparkSubmit {
     if (clusterManager == STANDALONE) {
       val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
       sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(","))
-      println("SPARK JARS" + sysProps.get("spark.jars"))
     }
 
     if (deployOnCluster && clusterManager == STANDALONE) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index c545b093ac82e0c68b06b70eecc6746f4dbe07d2..58d9e9add764a222878e7fef529526b0dc8a344f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -21,14 +21,15 @@ import java.io.{File, FileInputStream, IOException}
 import java.util.Properties
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, ArrayBuffer}
+import scala.collection.mutable.{ArrayBuffer, HashMap}
 
 import org.apache.spark.SparkException
+import org.apache.spark.util.Utils
 
 /**
  * Parses and encapsulates arguments from the spark-submit script.
  */
-private[spark] class SparkSubmitArguments(args: Array[String]) {
+private[spark] class SparkSubmitArguments(args: Seq[String]) {
   var master: String = null
   var deployMode: String = null
   var executorMemory: String = null
@@ -118,8 +119,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
 
     if (master.startsWith("yarn")) {
       val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
-      val testing = sys.env.contains("SPARK_TESTING")
-      if (!hasHadoopEnv && !testing) {
+      if (!hasHadoopEnv && !Utils.isTesting) {
         throw new Exception(s"When running with master '$master' " +
           "either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
       }
@@ -156,119 +156,121 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
     """.stripMargin
   }
 
-  private def parseOpts(opts: List[String]): Unit = opts match {
-    case ("--name") :: value :: tail =>
-      name = value
-      parseOpts(tail)
+  /** Fill in values by parsing user options. */
+  private def parseOpts(opts: Seq[String]): Unit = {
+    // Delineates parsing of Spark options from parsing of user options.
+    var inSparkOpts = true
+    parse(opts)
 
-    case ("--master") :: value :: tail =>
-      master = value
-      parseOpts(tail)
+    def parse(opts: Seq[String]): Unit = opts match {
+      case ("--name") :: value :: tail =>
+        name = value
+        parse(tail)
 
-    case ("--class") :: value :: tail =>
-      mainClass = value
-      parseOpts(tail)
+      case ("--master") :: value :: tail =>
+        master = value
+        parse(tail)
 
-    case ("--deploy-mode") :: value :: tail =>
-      if (value != "client" && value != "cluster") {
-        SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
-      }
-      deployMode = value
-      parseOpts(tail)
-
-    case ("--num-executors") :: value :: tail =>
-      numExecutors = value
-      parseOpts(tail)
-
-    case ("--total-executor-cores") :: value :: tail =>
-      totalExecutorCores = value
-      parseOpts(tail)
-
-    case ("--executor-cores") :: value :: tail =>
-      executorCores = value
-      parseOpts(tail)
-
-    case ("--executor-memory") :: value :: tail =>
-      executorMemory = value
-      parseOpts(tail)
-
-    case ("--driver-memory") :: value :: tail =>
-      driverMemory = value
-      parseOpts(tail)
-
-    case ("--driver-cores") :: value :: tail =>
-      driverCores = value
-      parseOpts(tail)
-
-    case ("--driver-class-path") :: value :: tail =>
-      driverExtraClassPath = value
-      parseOpts(tail)
-
-    case ("--driver-java-options") :: value :: tail =>
-      driverExtraJavaOptions = value
-      parseOpts(tail)
-
-    case ("--driver-library-path") :: value :: tail =>
-      driverExtraLibraryPath = value
-      parseOpts(tail)
-
-    case ("--properties-file") :: value :: tail =>
-      propertiesFile = value
-      parseOpts(tail)
-
-    case ("--supervise") :: tail =>
-      supervise = true
-      parseOpts(tail)
-
-    case ("--queue") :: value :: tail =>
-      queue = value
-      parseOpts(tail)
-
-    case ("--files") :: value :: tail =>
-      files = value
-      parseOpts(tail)
-
-    case ("--archives") :: value :: tail =>
-      archives = value
-      parseOpts(tail)
-
-    case ("--arg") :: value :: tail =>
-      childArgs += value
-      parseOpts(tail)
-
-    case ("--jars") :: value :: tail =>
-      jars = value
-      parseOpts(tail)
-
-    case ("--help" | "-h") :: tail =>
-      printUsageAndExit(0)
-
-    case ("--verbose" | "-v") :: tail =>
-      verbose = true
-      parseOpts(tail)
-
-    case value :: tail =>
-      if (value.startsWith("-")) {
-        val errMessage = s"Unrecognized option '$value'."
-        val suggestion: Option[String] = value match {
-          case v if v.startsWith("--") && v.contains("=") =>
-            val parts = v.split("=")
-            Some(s"Perhaps you want '${parts(0)} ${parts(1)}'?")
-          case _ =>
-            None
+      case ("--class") :: value :: tail =>
+        mainClass = value
+        parse(tail)
+
+      case ("--deploy-mode") :: value :: tail =>
+        if (value != "client" && value != "cluster") {
+          SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
+        }
+        deployMode = value
+        parse(tail)
+
+      case ("--num-executors") :: value :: tail =>
+        numExecutors = value
+        parse(tail)
+
+      case ("--total-executor-cores") :: value :: tail =>
+        totalExecutorCores = value
+        parse(tail)
+
+      case ("--executor-cores") :: value :: tail =>
+        executorCores = value
+        parse(tail)
+
+      case ("--executor-memory") :: value :: tail =>
+        executorMemory = value
+        parse(tail)
+
+      case ("--driver-memory") :: value :: tail =>
+        driverMemory = value
+        parse(tail)
+
+      case ("--driver-cores") :: value :: tail =>
+        driverCores = value
+        parse(tail)
+
+      case ("--driver-class-path") :: value :: tail =>
+        driverExtraClassPath = value
+        parse(tail)
+
+      case ("--driver-java-options") :: value :: tail =>
+        driverExtraJavaOptions = value
+        parse(tail)
+
+      case ("--driver-library-path") :: value :: tail =>
+        driverExtraLibraryPath = value
+        parse(tail)
+
+      case ("--properties-file") :: value :: tail =>
+        propertiesFile = value
+        parse(tail)
+
+      case ("--supervise") :: tail =>
+        supervise = true
+        parse(tail)
+
+      case ("--queue") :: value :: tail =>
+        queue = value
+        parse(tail)
+
+      case ("--files") :: value :: tail =>
+        files = value
+        parse(tail)
+
+      case ("--archives") :: value :: tail =>
+        archives = value
+        parse(tail)
+
+      case ("--jars") :: value :: tail =>
+        jars = value
+        parse(tail)
+
+      case ("--help" | "-h") :: tail =>
+        printUsageAndExit(0)
+
+      case ("--verbose" | "-v") :: tail =>
+        verbose = true
+        parse(tail)
+
+      case value :: tail =>
+        if (inSparkOpts) {
+          value match {
+            // convert --foo=bar to --foo bar
+            case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 =>
+              val parts = v.split("=")
+              parse(Seq(parts(0), parts(1)) ++ tail)
+            case v if v.startsWith("-") =>
+              val errMessage = s"Unrecognized option '$value'."
+              SparkSubmit.printErrorAndExit(errMessage)
+            case v =>
+             primaryResource = v
+             inSparkOpts = false
+             parse(tail)
+          }
+        } else {
+          childArgs += value
+          parse(tail)
         }
-        SparkSubmit.printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse(""))
-      }
 
-      if (primaryResource != null) {
-        val error = s"Found two conflicting resources, $value and $primaryResource." +
-          " Expecting only one resource."
-        SparkSubmit.printErrorAndExit(error)
+      case Nil =>
       }
-      primaryResource = value
-      parseOpts(tail)
-
-    case Nil =>
   }
 
   private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
@@ -277,7 +279,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
       outStream.println("Unknown/unsupported param " + unknownParam)
     }
     outStream.println(
-      """Usage: spark-submit <app jar> [options]
+      """Usage: spark-submit [options] <app jar> [app options]
         |Options:
         |  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
         |  --deploy-mode DEPLOY_MODE   Mode to deploy the app in, either 'client' or 'cluster'.
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8351f7156a5e46be563888c44f72b892258aa91e..5a55e7df3483274f2189783f2448c015867fe8cc 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1056,4 +1056,11 @@ private[spark] object Utils extends Logging {
   def getHadoopFileSystem(path: String): FileSystem = {
     getHadoopFileSystem(new URI(path))
   }
+
+  /**
+   * Indicates whether Spark is currently running unit tests.
+   */
+  private[spark] def isTesting = {
+    sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 657b44668d385d303f1ea59ae29659492c925e9b..10a65c75cc621c8c0507076e37441b3c04bbbd47 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -28,6 +28,9 @@ import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
 
 class SparkSubmitSuite extends FunSuite with ShouldMatchers {
+  def beforeAll() {
+    System.setProperty("spark.testing", "true")
+  }
 
   val noOpOutputStream = new OutputStream {
     def write(b: Int) = {}
@@ -74,33 +77,35 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
     testPrematureExit(Array("--help"), "Usage: spark-submit")
   }
 
-  test("prints error with unrecognized option") {
+  test("prints error with unrecognized options") {
     testPrematureExit(Array("--blarg"), "Unrecognized option '--blarg'")
     testPrematureExit(Array("-bleg"), "Unrecognized option '-bleg'")
-    testPrematureExit(Array("--master=abc"),
-      "Unrecognized option '--master=abc'. Perhaps you want '--master abc'?")
   }
 
-  test("handles multiple binary definitions") {
-    val adjacentJars = Array("foo.jar", "bar.jar")
-    testPrematureExit(adjacentJars, "error: Found two conflicting resources")
+  test("handle binary specified but not class") {
+    testPrematureExit(Array("foo.jar"), "Must specify a main class")
+  }
 
-    val nonAdjacentJars =
-      Array("foo.jar", "--master", "123", "--class", "abc", "bar.jar")
-    testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources")
+  test("handles arguments with --key=val") {
+    val clArgs = Seq("--jars=one.jar,two.jar,three.jar", "--name=myApp")
+    val appArgs = new SparkSubmitArguments(clArgs)
+    appArgs.jars should be ("one.jar,two.jar,three.jar")
+    appArgs.name should be ("myApp")
   }
 
-  test("handle binary specified but not class") {
-    testPrematureExit(Array("foo.jar"), "Must specify a main class")
+  test("handles arguments to user program") {
+    val clArgs = Seq("--name", "myApp", "userjar.jar", "some", "--random", "args", "here")
+    val appArgs = new SparkSubmitArguments(clArgs)
+    appArgs.childArgs should be (Seq("some", "--random", "args", "here"))
   }
 
   test("handles YARN cluster mode") {
-    val clArgs = Array("thejar.jar", "--deploy-mode", "cluster",
+    val clArgs = Seq("--deploy-mode", "cluster",
       "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
       "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
-      "--arg", "arg1", "--arg", "arg2", "--driver-memory", "4g",
-      "--queue", "thequeue", "--files", "file1.txt,file2.txt",
-      "--archives", "archive1.txt,archive2.txt", "--num-executors", "6")
+      "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
+      "--archives", "archive1.txt,archive2.txt", "--num-executors", "6",
+      "thejar.jar", "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     val childArgsStr = childArgs.mkString(" ")
@@ -121,12 +126,12 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   test("handles YARN client mode") {
-    val clArgs = Array("thejar.jar", "--deploy-mode", "client",
+    val clArgs = Seq("--deploy-mode", "client",
       "--master", "yarn", "--executor-memory", "5g", "--executor-cores", "5",
       "--class", "org.SomeClass", "--jars", "one.jar,two.jar,three.jar",
-      "--arg", "arg1", "--arg", "arg2", "--driver-memory", "4g",
-      "--queue", "thequeue", "--files", "file1.txt,file2.txt",
-      "--archives", "archive1.txt,archive2.txt", "--num-executors", "6")
+      "--driver-memory", "4g", "--queue", "thequeue", "--files", "file1.txt,file2.txt",
+      "--archives", "archive1.txt,archive2.txt", "--num-executors", "6", "thejar.jar",
+      "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
@@ -144,9 +149,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   test("handles standalone cluster mode") {
-    val clArgs = Array("thejar.jar", "--deploy-mode", "cluster",
-      "--master", "spark://h:p", "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2",
-      "--supervise", "--driver-memory", "4g", "--driver-cores", "5")
+    val clArgs = Seq("--deploy-mode", "cluster",
+      "--master", "spark://h:p", "--class", "org.SomeClass",
+      "--supervise", "--driver-memory", "4g", "--driver-cores", "5", "thejar.jar", "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     val childArgsStr = childArgs.mkString(" ")
@@ -158,10 +163,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   test("handles standalone client mode") {
-    val clArgs = Array("thejar.jar", "--deploy-mode", "client",
+    val clArgs = Seq("--deploy-mode", "client",
       "--master", "spark://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
-      "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2",
-      "--driver-memory", "4g")
+      "--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
@@ -172,10 +176,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
   }
 
   test("handles mesos client mode") {
-    val clArgs = Array("thejar.jar", "--deploy-mode", "client",
+    val clArgs = Seq("--deploy-mode", "client",
       "--master", "mesos://h:p", "--executor-memory", "5g", "--total-executor-cores", "5",
-      "--class", "org.SomeClass", "--arg", "arg1", "--arg", "arg2",
-      "--driver-memory", "4g")
+      "--class", "org.SomeClass", "--driver-memory", "4g", "thejar.jar", "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
     val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
     childArgs.mkString(" ") should be ("arg1 arg2")
@@ -187,22 +190,24 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
 
   test("launch simple application with spark-submit") {
     runSparkSubmit(
-      Seq("unUsed.jar",
+      Seq(
         "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
         "--name", "testApp",
-        "--master", "local"))
+        "--master", "local",
+        "unUsed.jar"))
   }
 
   test("spark submit includes jars passed in through --jar") {
     val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
     val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
     val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
-    runSparkSubmit(
-      Seq("unUsed.jar",
-        "--class", JarCreationTest.getClass.getName.stripSuffix("$"),
-        "--name", "testApp",
-        "--master", "local-cluster[2,1,512]",
-        "--jars", jarsString))
+    val args = Seq(
+      "--class", JarCreationTest.getClass.getName.stripSuffix("$"),
+      "--name", "testApp",
+      "--master", "local-cluster[2,1,512]",
+      "--jars", jarsString,
+      "unused.jar")
+    runSparkSubmit(args)
   }
 
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index dcc063042628cbdcd954d12fcc03a0bed0bb2ec9..b011679fede2d8283e28181a3302028110ab518f 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -73,30 +73,34 @@ the bin directory. This script takes care of setting up the classpath with Spark
 dependencies, and can support different cluster managers and deploy modes that Spark supports.
 It's usage is
 
-    ./bin/spark-submit <app jar> --class path.to.your.Class [other options..]
+    ./bin/spark-submit --class path.to.your.Class [options] <app jar> [app options]
 
-To enumerate all options available to `spark-submit` run it with the `--help` flag.
-Here are a few examples of common options:
+When calling `spark-submit`, `[app options]` will be passed along to your application's
+main class. To enumerate all options available to `spark-submit` run it with 
+the `--help` flag. Here are a few examples of common options:
 
 {% highlight bash %}
 # Run application locally
-./bin/spark-submit my-app.jar \
+./bin/spark-submit \
   --class my.main.ClassName
-  --master local[8]
+  --master local[8] \
+  my-app.jar
 
 # Run on a Spark cluster
-./bin/spark-submit my-app.jar \
+./bin/spark-submit \
   --class my.main.ClassName
   --master spark://mycluster:7077 \
   --executor-memory 20G \
-  --total-executor-cores 100
+  --total-executor-cores 100 \
+  my-app.jar
 
 # Run on a YARN cluster
-HADOOP_CONF_DIR=XX /bin/spark-submit my-app.jar \
+HADOOP_CONF_DIR=XX /bin/spark-submit \
   --class my.main.ClassName
   --master yarn-cluster \  # can also be `yarn-client` for client mode
   --executor-memory 20G \
-  --num-executors 50
+  --num-executors 50 \
+  my-app.jar
 {% endhighlight %}
 
 ### Loading Configurations from a File
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 68afa6e1bff951f530066fe5fc0fd4364ec13691..64996b52e0404c4622bf1d0d45490d046e2a3ec2 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -179,9 +179,10 @@ $ sbt package
 [info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
 
 # Use spark-submit to run your application
-$ YOUR_SPARK_HOME/bin/spark-submit target/scala-2.10/simple-project_2.10-1.0.jar \
+$ YOUR_SPARK_HOME/bin/spark-submit \
   --class "SimpleApp" \
-  --master local[4]
+  --master local[4] \
+  target/scala-2.10/simple-project_2.10-1.0.jar
 ...
 Lines with a: 46, Lines with b: 23
 {% endhighlight %}
@@ -272,9 +273,10 @@ $ mvn package
 [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
 
 # Use spark-submit to run your application
-$ YOUR_SPARK_HOME/bin/spark-submit target/simple-project-1.0.jar \
+$ YOUR_SPARK_HOME/bin/spark-submit \
   --class "SimpleApp" \
-  --master local[4]
+  --master local[4] \
+  target/simple-project-1.0.jar
 ...
 Lines with a: 46, Lines with b: 23
 {% endhighlight %}