diff --git a/bin/beeline b/bin/beeline
index 09fe366c609fab742cd99e6799af3a8d3476adad..1bda4dba50605f174887bf29e33fd0e57c0865af 100755
--- a/bin/beeline
+++ b/bin/beeline
@@ -17,29 +17,14 @@
 # limitations under the License.
 #
 
-# Figure out where Spark is installed
-FWDIR="$(cd `dirname $0`/..; pwd)"
+#
+# Shell script for starting BeeLine
 
-# Find the java binary
-if [ -n "${JAVA_HOME}" ]; then
-  RUNNER="${JAVA_HOME}/bin/java"
-else
-  if [ `command -v java` ]; then
-    RUNNER="java"
-  else
-    echo "JAVA_HOME is not set" >&2
-    exit 1
-  fi
-fi
+# Enter posix mode for bash
+set -o posix
 
-# Compute classpath using external script
-classpath_output=$($FWDIR/bin/compute-classpath.sh)
-if [[ "$?" != "0" ]]; then
-  echo "$classpath_output"
-  exit 1
-else
-  CLASSPATH=$classpath_output
-fi
+# Figure out where Spark is installed
+FWDIR="$(cd `dirname $0`/..; pwd)"
 
 CLASS="org.apache.hive.beeline.BeeLine"
-exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@"
+exec "$FWDIR/bin/spark-class" $CLASS "$@"
diff --git a/bin/spark-sql b/bin/spark-sql
index bba7f897b19bcb1bcbc04e261fa8c66eb31bb901..61ebd8ab6dec8960770dece524ee81fb8b4b04ed 100755
--- a/bin/spark-sql
+++ b/bin/spark-sql
@@ -23,14 +23,72 @@
 # Enter posix mode for bash
 set -o posix
 
+CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
+
 # Figure out where Spark is installed
 FWDIR="$(cd `dirname $0`/..; pwd)"
 
-if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
-  echo "Usage: ./sbin/spark-sql [options]"
+function usage {
+  echo "Usage: ./sbin/spark-sql [options] [cli option]"
+  pattern="usage"
+  pattern+="\|Spark assembly has been built with Hive"
+  pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
+  pattern+="\|Spark Command: "
+  pattern+="\|--help"
+  pattern+="\|======="
+
   $FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
+  echo
+  echo "CLI options:"
+  $FWDIR/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
+}
+
+function ensure_arg_number {
+  arg_number=$1
+  at_least=$2
+
+  if [[ $arg_number -lt $at_least ]]; then
+    usage
+    exit 1
+  fi
+}
+
+if [[ "$@" = --help ]] || [[ "$@" = -h ]]; then
+  usage
   exit 0
 fi
 
-CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
-exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
+CLI_ARGS=()
+SUBMISSION_ARGS=()
+
+while (($#)); do
+  case $1 in
+    -d | --define | --database | -f | -h | --hiveconf | --hivevar | -i | -p)
+      ensure_arg_number $# 2
+      CLI_ARGS+=($1); shift
+      CLI_ARGS+=($1); shift
+      ;;
+
+    -e)
+      ensure_arg_number $# 2
+      CLI_ARGS+=($1); shift
+      CLI_ARGS+=(\"$1\"); shift
+      ;;
+
+    -s | --silent)
+      CLI_ARGS+=($1); shift
+      ;;
+
+    -v | --verbose)
+      # Both SparkSubmit and SparkSQLCLIDriver recognizes -v | --verbose
+      CLI_ARGS+=($1)
+      SUBMISSION_ARGS+=($1); shift
+      ;;
+
+    *)
+      SUBMISSION_ARGS+=($1); shift
+      ;;
+  esac
+done
+
+eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${CLI_ARGS[*]}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 9391f24e71ed7fbc3b30e2bbcf852eab4bea98e9..087dd4d633db0d759aa7b04a2d9aaeeff21f5963 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -220,6 +220,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
   /** Fill in values by parsing user options. */
   private def parseOpts(opts: Seq[String]): Unit = {
     var inSparkOpts = true
+    val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r
 
     // Delineates parsing of Spark options from parsing of user options.
     parse(opts)
@@ -322,33 +323,21 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
         verbose = true
         parse(tail)
 
+      case EQ_SEPARATED_OPT(opt, value) :: tail =>
+        parse(opt :: value :: tail)
+
+      case value :: tail if value.startsWith("-") =>
+        SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.")
+
       case value :: tail =>
-        if (inSparkOpts) {
-          value match {
-            // convert --foo=bar to --foo bar
-            case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 =>
-              val parts = v.split("=")
-              parse(Seq(parts(0), parts(1)) ++ tail)
-            case v if v.startsWith("-") =>
-              val errMessage = s"Unrecognized option '$value'."
-              SparkSubmit.printErrorAndExit(errMessage)
-            case v =>
-              primaryResource =
-                if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
-                  Utils.resolveURI(v).toString
-                } else {
-                  v
-                }
-              inSparkOpts = false
-              isPython = SparkSubmit.isPython(v)
-              parse(tail)
+        primaryResource =
+          if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) {
+            Utils.resolveURI(value).toString
+          } else {
+            value
           }
-        } else {
-          if (!value.isEmpty) {
-            childArgs += value
-          }
-          parse(tail)
-        }
+        isPython = SparkSubmit.isPython(value)
+        childArgs ++= tail
 
       case Nil =>
     }
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index a5cdcfb5de03b495d58872756bfe6f56483f33b4..7e1ef80c8456145df3b8a8148431682b1718b479 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -106,6 +106,18 @@ class SparkSubmitSuite extends FunSuite with Matchers {
     appArgs.childArgs should be (Seq("some", "--weird", "args"))
   }
 
+  test("handles arguments to user program with name collision") {
+    val clArgs = Seq(
+      "--name", "myApp",
+      "--class", "Foo",
+      "userjar.jar",
+      "--master", "local",
+      "some",
+      "--weird", "args")
+    val appArgs = new SparkSubmitArguments(clArgs)
+    appArgs.childArgs should be (Seq("--master", "local", "some", "--weird", "args"))
+  }
+
   test("handles YARN cluster mode") {
     val clArgs = Seq(
       "--deploy-mode", "cluster",
diff --git a/sbin/start-thriftserver.sh b/sbin/start-thriftserver.sh
index 8398e6f19b51152f3986196061b9807a4c07c542..603f50ae13240833b6db387f4f31b0f2a042e1da 100755
--- a/sbin/start-thriftserver.sh
+++ b/sbin/start-thriftserver.sh
@@ -26,11 +26,53 @@ set -o posix
 # Figure out where Spark is installed
 FWDIR="$(cd `dirname $0`/..; pwd)"
 
-if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
-  echo "Usage: ./sbin/start-thriftserver [options]"
+CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
+
+function usage {
+  echo "Usage: ./sbin/start-thriftserver [options] [thrift server options]"
+  pattern="usage"
+  pattern+="\|Spark assembly has been built with Hive"
+  pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
+  pattern+="\|Spark Command: "
+  pattern+="\|======="
+  pattern+="\|--help"
+
   $FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
+  echo
+  echo "Thrift server options:"
+  $FWDIR/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
+}
+
+function ensure_arg_number {
+  arg_number=$1
+  at_least=$2
+
+  if [[ $arg_number -lt $at_least ]]; then
+    usage
+    exit 1
+  fi
+}
+
+if [[ "$@" = --help ]] || [[ "$@" = -h ]]; then
+  usage
   exit 0
 fi
 
-CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
-exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
+THRIFT_SERVER_ARGS=()
+SUBMISSION_ARGS=()
+
+while (($#)); do
+  case $1 in
+    --hiveconf)
+      ensure_arg_number $# 2
+      THRIFT_SERVER_ARGS+=($1); shift
+      THRIFT_SERVER_ARGS+=($1); shift
+      ;;
+
+    *)
+      SUBMISSION_ARGS+=($1); shift
+      ;;
+  esac
+done
+
+eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${THRIFT_SERVER_ARGS[*]}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 08d3f983d9e71df99cde60a0950ba39cf132b907..6f7942aba314a2eef57c0c5f89ea6a91938fd8e0 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -40,7 +40,6 @@ private[hive] object HiveThriftServer2 extends Logging {
     val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
 
     if (!optionsProcessor.process(args)) {
-      logWarning("Error starting HiveThriftServer2 with given arguments")
       System.exit(-1)
     }
 
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 69f19f826a802aeadf56e1486d9d608f7aebe013..2bf8cfdcacd227798fc323104f7efbc1e775e08f 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.io.{BufferedReader, InputStreamReader, PrintWriter}
 
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
 class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils {
@@ -27,15 +28,15 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils {
   val METASTORE_PATH = TestUtils.getMetastorePath("cli")
 
   override def beforeAll() {
-    val pb = new ProcessBuilder(
-      "../../bin/spark-sql",
-      "--master",
-      "local",
-      "--hiveconf",
-      s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
-      "--hiveconf",
-      "hive.metastore.warehouse.dir=" + WAREHOUSE_PATH)
-
+    val jdbcUrl = s"jdbc:derby:;databaseName=$METASTORE_PATH;create=true"
+    val commands =
+      s"""../../bin/spark-sql
+         |  --master local
+         |  --hiveconf ${ConfVars.METASTORECONNECTURLKEY}="$jdbcUrl"
+         |  --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$WAREHOUSE_PATH
+       """.stripMargin.split("\\s+")
+
+    val pb = new ProcessBuilder(commands: _*)
     process = pb.start()
     outputWriter = new PrintWriter(process.getOutputStream, true)
     inputReader = new BufferedReader(new InputStreamReader(process.getInputStream))
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index b7b7c9957ac34ff7c71146774c9d8d87c68d2e6d..78bffa2607349142dd03fe6a241c03d2adb2b1cc 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -25,6 +25,7 @@ import java.io.{BufferedReader, InputStreamReader}
 import java.net.ServerSocket
 import java.sql.{Connection, DriverManager, Statement}
 
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
 import org.apache.spark.Logging
@@ -63,16 +64,18 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with TestUt
     // Forking a new process to start the Hive Thrift server. The reason to do this is it is
     // hard to clean up Hive resources entirely, so we just start a new process and kill
     // that process for cleanup.
-    val defaultArgs = Seq(
-      "../../sbin/start-thriftserver.sh",
-      "--master local",
-      "--hiveconf",
-      "hive.root.logger=INFO,console",
-      "--hiveconf",
-      s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
-      "--hiveconf",
-      s"hive.metastore.warehouse.dir=$WAREHOUSE_PATH")
-    val pb = new ProcessBuilder(defaultArgs ++ args)
+    val jdbcUrl = s"jdbc:derby:;databaseName=$METASTORE_PATH;create=true"
+    val command =
+      s"""../../sbin/start-thriftserver.sh
+         |  --master local
+         |  --hiveconf hive.root.logger=INFO,console
+         |  --hiveconf ${ConfVars.METASTORECONNECTURLKEY}="$jdbcUrl"
+         |  --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$METASTORE_PATH
+         |  --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=$HOST
+         |  --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$PORT
+       """.stripMargin.split("\\s+")
+
+    val pb = new ProcessBuilder(command ++ args: _*)
     val environment = pb.environment()
     environment.put("HIVE_SERVER2_THRIFT_PORT", PORT.toString)
     environment.put("HIVE_SERVER2_THRIFT_BIND_HOST", HOST)