diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index c2c3e9a9e48278033d83177514da10e570b53a19..848b62f9de71b1773cf037877317017928011ce4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy
 
+import scala.collection.mutable.HashSet
 import scala.concurrent._
 
 import akka.actor._
@@ -31,21 +32,24 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils}
 
 /**
  * Proxy that relays messages to the driver.
+ *
+ * We currently don't support retry if submission fails. In HA mode, client will submit request to
+ * all masters and see which one could handle it.
  */
 private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
   extends Actor with ActorLogReceive with Logging {
 
-  var masterActor: ActorSelection = _
+  private val masterActors = driverArgs.masters.map { m =>
+    context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system)))
+  }
+  private val lostMasters = new HashSet[Address]
+  private var activeMasterActor: ActorSelection = null
+
   val timeout = RpcUtils.askTimeout(conf)
 
   override def preStart(): Unit = {
-    masterActor = context.actorSelection(
-      Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))
-
     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 
-    println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
-
     driverArgs.cmd match {
       case "launch" =>
         // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
@@ -79,11 +83,17 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
           driverArgs.supervise,
           command)
 
-        masterActor ! RequestSubmitDriver(driverDescription)
+        // This assumes only one Master is active at a time
+        for (masterActor <- masterActors) {
+          masterActor ! RequestSubmitDriver(driverDescription)
+        }
 
       case "kill" =>
         val driverId = driverArgs.driverId
-        masterActor ! RequestKillDriver(driverId)
+        // This assumes only one Master is active at a time
+        for (masterActor <- masterActors) {
+          masterActor ! RequestKillDriver(driverId)
+        }
     }
   }
 
@@ -92,10 +102,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
     println("... waiting before polling master for driver state")
     Thread.sleep(5000)
     println("... polling master for driver state")
-    val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
+    val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout)
       .mapTo[DriverStatusResponse]
     val statusResponse = Await.result(statusFuture, timeout)
-
     statusResponse.found match {
       case false =>
         println(s"ERROR: Cluster master did not recognize $driverId")
@@ -122,20 +131,46 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
 
     case SubmitDriverResponse(success, driverId, message) =>
       println(message)
-      if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
+      if (success) {
+        activeMasterActor = context.actorSelection(sender.path)
+        pollAndReportStatus(driverId.get)
+      } else if (!Utils.responseFromBackup(message)) {
+        System.exit(-1)
+      }
+
 
     case KillDriverResponse(driverId, success, message) =>
       println(message)
-      if (success) pollAndReportStatus(driverId) else System.exit(-1)
+      if (success) {
+        activeMasterActor = context.actorSelection(sender.path)
+        pollAndReportStatus(driverId)
+      } else if (!Utils.responseFromBackup(message)) {
+        System.exit(-1)
+      }
 
     case DisassociatedEvent(_, remoteAddress, _) =>
-      println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
-      System.exit(-1)
+      if (!lostMasters.contains(remoteAddress)) {
+        println(s"Error connecting to master $remoteAddress.")
+        lostMasters += remoteAddress
+        // Note that this heuristic does not account for the fact that a Master can recover within
+        // the lifetime of this client. Thus, once a Master is lost it is lost to us forever. This
+        // is not currently a concern, however, because this client does not retry submissions.
+        if (lostMasters.size >= masterActors.size) {
+          println("No master is available, exiting.")
+          System.exit(-1)
+        }
+      }
 
     case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
-      println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
-      println(s"Cause was: $cause")
-      System.exit(-1)
+      if (!lostMasters.contains(remoteAddress)) {
+        println(s"Error connecting to master ($remoteAddress).")
+        println(s"Cause was: $cause")
+        lostMasters += remoteAddress
+        if (lostMasters.size >= masterActors.size) {
+          println("No master is available, exiting.")
+          System.exit(-1)
+        }
+      }
   }
 }
 
@@ -163,7 +198,9 @@ object Client {
       "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
 
     // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
-    Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem))
+    for (m <- driverArgs.masters) {
+      Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem))
+    }
     actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
 
     actorSystem.awaitTermination()
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index 5cbac787dceebf15991d608c25a696f8f727de9f..316e2d59f01b8b02ceca776ccc688306eeb751cd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -22,8 +22,7 @@ import java.net.{URI, URISyntaxException}
 import scala.collection.mutable.ListBuffer
 
 import org.apache.log4j.Level
-
-import org.apache.spark.util.{IntParam, MemoryParam}
+import org.apache.spark.util.{IntParam, MemoryParam, Utils}
 
 /**
  * Command-line parser for the driver client.
@@ -35,7 +34,7 @@ private[deploy] class ClientArguments(args: Array[String]) {
   var logLevel = Level.WARN
 
   // launch parameters
-  var master: String = ""
+  var masters: Array[String] = null
   var jarUrl: String = ""
   var mainClass: String = ""
   var supervise: Boolean = DEFAULT_SUPERVISE
@@ -80,13 +79,13 @@ private[deploy] class ClientArguments(args: Array[String]) {
       }
 
       jarUrl = _jarUrl
-      master = _master
+      masters = Utils.parseStandaloneMasterUrls(_master)
       mainClass = _mainClass
       _driverOptions ++= tail
 
     case "kill" :: _master :: _driverId :: tail =>
       cmd = "kill"
-      master = _master
+      masters = Utils.parseStandaloneMasterUrls(_master)
       driverId = _driverId
 
     case _ =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index af38bf80e4f0bbcf8be573c5e00830df1cd52d3b..42b5d41b7b526990b68b927a1f61d7be38daabd6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -118,8 +118,8 @@ object SparkSubmit {
    * Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
    */
   private def kill(args: SparkSubmitArguments): Unit = {
-    new RestSubmissionClient()
-      .killSubmission(args.master, args.submissionToKill)
+    new RestSubmissionClient(args.master)
+      .killSubmission(args.submissionToKill)
   }
 
   /**
@@ -127,8 +127,8 @@ object SparkSubmit {
    * Standalone and Mesos cluster mode only.
    */
   private def requestStatus(args: SparkSubmitArguments): Unit = {
-    new RestSubmissionClient()
-      .requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
+    new RestSubmissionClient(args.master)
+      .requestSubmissionStatus(args.submissionToRequestStatusFor)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index dc6077f3d132bb0de1f8b8bd9e6d63741d837eb0..0fac3cdcf55e7ffe9f2dd67a20714a0d21a7aa71 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -254,7 +254,8 @@ private[master] class Master(
 
     case RequestSubmitDriver(description) => {
       if (state != RecoveryState.ALIVE) {
-        val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
+        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
+          "Can only accept driver submissions in ALIVE state."
         sender ! SubmitDriverResponse(false, None, msg)
       } else {
         logInfo("Driver submitted " + description.command.mainClass)
@@ -274,7 +275,8 @@ private[master] class Master(
 
     case RequestKillDriver(driverId) => {
       if (state != RecoveryState.ALIVE) {
-        val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
+        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
+          s"Can only kill drivers in ALIVE state."
         sender ! KillDriverResponse(driverId, success = false, msg)
       } else {
         logInfo("Asked to kill driver " + driverId)
@@ -305,12 +307,18 @@ private[master] class Master(
     }
 
     case RequestDriverStatus(driverId) => {
-      (drivers ++ completedDrivers).find(_.id == driverId) match {
-        case Some(driver) =>
-          sender ! DriverStatusResponse(found = true, Some(driver.state),
-            driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
-        case None =>
-          sender ! DriverStatusResponse(found = false, None, None, None, None)
+      if (state != RecoveryState.ALIVE) {
+        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
+          "Can only request driver status in ALIVE state."
+        sender ! DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg)))
+      } else {
+        (drivers ++ completedDrivers).find(_.id == driverId) match {
+          case Some(driver) =>
+            sender ! DriverStatusResponse(found = true, Some(driver.state),
+              driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
+          case None =>
+            sender ! DriverStatusResponse(found = false, None, None, None, None)
+        }
       }
     }
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index 307cebfb4bd0934b0aaa27293b0b69fe9d6b33fa..6078f50518ba4ff96b4f332bc5663904b9b06903 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.deploy.rest
 
 import java.io.{DataOutputStream, FileNotFoundException}
-import java.net.{HttpURLConnection, SocketException, URL}
+import java.net.{ConnectException, HttpURLConnection, SocketException, URL}
 import javax.servlet.http.HttpServletResponse
 
+import scala.collection.mutable
 import scala.io.Source
 
 import com.fasterxml.jackson.core.JsonProcessingException
@@ -51,57 +52,109 @@ import org.apache.spark.util.Utils
  * implementation of this client can use that information to retry using the version specified
  * by the server.
  */
-private[spark] class RestSubmissionClient extends Logging {
+private[spark] class RestSubmissionClient(master: String) extends Logging {
   import RestSubmissionClient._
 
   private val supportedMasterPrefixes = Seq("spark://", "mesos://")
 
+  private val masters: Array[String] = Utils.parseStandaloneMasterUrls(master)
+
+  // Set of masters that lost contact with us, used to keep track of
+  // whether there are masters still alive for us to communicate with
+  private val lostMasters = new mutable.HashSet[String]
+
   /**
    * Submit an application specified by the parameters in the provided request.
    *
    * If the submission was successful, poll the status of the submission and report
    * it to the user. Otherwise, report the error message provided by the server.
    */
-  def createSubmission(
-      master: String,
-      request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
+  def createSubmission(request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
     logInfo(s"Submitting a request to launch an application in $master.")
-    validateMaster(master)
-    val url = getSubmitUrl(master)
-    val response = postJson(url, request.toJson)
-    response match {
-      case s: CreateSubmissionResponse =>
-        reportSubmissionStatus(master, s)
-        handleRestResponse(s)
-      case unexpected =>
-        handleUnexpectedRestResponse(unexpected)
+    var handled: Boolean = false
+    var response: SubmitRestProtocolResponse = null
+    for (m <- masters if !handled) {
+      validateMaster(m)
+      val url = getSubmitUrl(m)
+      try {
+        response = postJson(url, request.toJson)
+        response match {
+          case s: CreateSubmissionResponse =>
+            if (s.success) {
+              reportSubmissionStatus(s)
+              handleRestResponse(s)
+              handled = true
+            }
+          case unexpected =>
+            handleUnexpectedRestResponse(unexpected)
+        }
+      } catch {
+        case e: SubmitRestConnectionException =>
+          if (handleConnectionException(m)) {
+            throw new SubmitRestConnectionException("Unable to connect to server", e)
+          }
+      }
     }
     response
   }
 
   /** Request that the server kill the specified submission. */
-  def killSubmission(master: String, submissionId: String): SubmitRestProtocolResponse = {
+  def killSubmission(submissionId: String): SubmitRestProtocolResponse = {
     logInfo(s"Submitting a request to kill submission $submissionId in $master.")
-    validateMaster(master)
-    val response = post(getKillUrl(master, submissionId))
-    response match {
-      case k: KillSubmissionResponse => handleRestResponse(k)
-      case unexpected => handleUnexpectedRestResponse(unexpected)
+    var handled: Boolean = false
+    var response: SubmitRestProtocolResponse = null
+    for (m <- masters if !handled) {
+      validateMaster(m)
+      val url = getKillUrl(m, submissionId)
+      try {
+        response = post(url)
+        response match {
+          case k: KillSubmissionResponse =>
+            if (!Utils.responseFromBackup(k.message)) {
+              handleRestResponse(k)
+              handled = true
+            }
+          case unexpected =>
+            handleUnexpectedRestResponse(unexpected)
+        }
+      } catch {
+        case e: SubmitRestConnectionException =>
+          if (handleConnectionException(m)) {
+            throw new SubmitRestConnectionException("Unable to connect to server", e)
+          }
+      }
     }
     response
   }
 
   /** Request the status of a submission from the server. */
   def requestSubmissionStatus(
-      master: String,
       submissionId: String,
       quiet: Boolean = false): SubmitRestProtocolResponse = {
     logInfo(s"Submitting a request for the status of submission $submissionId in $master.")
-    validateMaster(master)
-    val response = get(getStatusUrl(master, submissionId))
-    response match {
-      case s: SubmissionStatusResponse => if (!quiet) { handleRestResponse(s) }
-      case unexpected => handleUnexpectedRestResponse(unexpected)
+
+    var handled: Boolean = false
+    var response: SubmitRestProtocolResponse = null
+    for (m <- masters if !handled) {
+      validateMaster(m)
+      val url = getStatusUrl(m, submissionId)
+      try {
+        response = get(url)
+        response match {
+          case s: SubmissionStatusResponse if s.success =>
+            if (!quiet) {
+              handleRestResponse(s)
+            }
+            handled = true
+          case unexpected =>
+            handleUnexpectedRestResponse(unexpected)
+        }
+      } catch {
+        case e: SubmitRestConnectionException =>
+          if (handleConnectionException(m)) {
+            throw new SubmitRestConnectionException("Unable to connect to server", e)
+          }
+      }
     }
     response
   }
@@ -148,11 +201,16 @@ private[spark] class RestSubmissionClient extends Logging {
     conn.setRequestProperty("Content-Type", "application/json")
     conn.setRequestProperty("charset", "utf-8")
     conn.setDoOutput(true)
-    val out = new DataOutputStream(conn.getOutputStream)
-    Utils.tryWithSafeFinally {
-      out.write(json.getBytes(Charsets.UTF_8))
-    } {
-      out.close()
+    try {
+      val out = new DataOutputStream(conn.getOutputStream)
+      Utils.tryWithSafeFinally {
+        out.write(json.getBytes(Charsets.UTF_8))
+      } {
+        out.close()
+      }
+    } catch {
+      case e: ConnectException =>
+        throw new SubmitRestConnectionException("Connect Exception when connect to server", e)
     }
     readResponse(conn)
   }
@@ -191,11 +249,9 @@ private[spark] class RestSubmissionClient extends Logging {
       }
     } catch {
       case unreachable @ (_: FileNotFoundException | _: SocketException) =>
-        throw new SubmitRestConnectionException(
-          s"Unable to connect to server ${connection.getURL}", unreachable)
+        throw new SubmitRestConnectionException("Unable to connect to server", unreachable)
       case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
-        throw new SubmitRestProtocolException(
-          "Malformed response received from server", malformed)
+        throw new SubmitRestProtocolException("Malformed response received from server", malformed)
     }
   }
 
@@ -241,13 +297,12 @@ private[spark] class RestSubmissionClient extends Logging {
 
   /** Report the status of a newly created submission. */
   private def reportSubmissionStatus(
-      master: String,
       submitResponse: CreateSubmissionResponse): Unit = {
     if (submitResponse.success) {
       val submissionId = submitResponse.submissionId
       if (submissionId != null) {
         logInfo(s"Submission successfully created as $submissionId. Polling submission state...")
-        pollSubmissionStatus(master, submissionId)
+        pollSubmissionStatus(submissionId)
       } else {
         // should never happen
         logError("Application successfully submitted, but submission ID was not provided!")
@@ -262,9 +317,9 @@ private[spark] class RestSubmissionClient extends Logging {
    * Poll the status of the specified submission and log it.
    * This retries up to a fixed number of times before giving up.
    */
-  private def pollSubmissionStatus(master: String, submissionId: String): Unit = {
+  private def pollSubmissionStatus(submissionId: String): Unit = {
     (1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ =>
-      val response = requestSubmissionStatus(master, submissionId, quiet = true)
+      val response = requestSubmissionStatus(submissionId, quiet = true)
       val statusResponse = response match {
         case s: SubmissionStatusResponse => s
         case _ => return // unexpected type, let upstream caller handle it
@@ -302,6 +357,21 @@ private[spark] class RestSubmissionClient extends Logging {
   private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = {
     logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.")
   }
+
+  /**
+   * When a connection exception is caught, return true if all masters are lost.
+   * Note that the heuristic used here does not take into account that masters
+   * can recover during the lifetime of this client. This assumption should be
+   * harmless because this client currently does not support retrying submission
+   * on failure yet (SPARK-6443).
+   */
+  private def handleConnectionException(masterUrl: String): Boolean = {
+    if (!lostMasters.contains(masterUrl)) {
+      logWarning(s"Unable to connect to server ${masterUrl}.")
+      lostMasters += masterUrl
+    }
+    lostMasters.size >= masters.size
+  }
 }
 
 private[spark] object RestSubmissionClient {
@@ -324,10 +394,10 @@ private[spark] object RestSubmissionClient {
     }
     val sparkProperties = conf.getAll.toMap
     val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") }
-    val client = new RestSubmissionClient
+    val client = new RestSubmissionClient(master)
     val submitRequest = client.constructSubmitRequest(
       appResource, mainClass, appArgs, sparkProperties, environmentVariables)
-    client.createSubmission(master, submitRequest)
+    client.createSubmission(submitRequest)
   }
 
   def main(args: Array[String]): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 88f9d880ac209b4792e35e8e5f4f7cf9123fbf84..9678631da9f6f8506618a40a68506703b332d1ae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -105,7 +105,7 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
       if (masters != null) {  // Two positional arguments were given
         printUsageAndExit(1)
       }
-      masters = value.stripPrefix("spark://").split(",").map("spark://" + _)
+      masters = Utils.parseStandaloneMasterUrls(value)
       parse(tail)
 
     case Nil =>
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 844f0cd22d95d54dd3e86f3525e22e94ab960339..be4db02ab86d0c470cdd2f90e864c7966dd86c17 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2159,6 +2159,22 @@ private[spark] object Utils extends Logging {
       .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
   }
 
+  /**
+   * Split the comma delimited string of master URLs into a list.
+   * For instance, "spark://abc,def" becomes [spark://abc, spark://def].
+   */
+  def parseStandaloneMasterUrls(masterUrls: String): Array[String] = {
+    masterUrls.stripPrefix("spark://").split(",").map("spark://" + _)
+  }
+
+  /** An identifier that backup masters use in their responses. */
+  val BACKUP_STANDALONE_MASTER_PREFIX = "Current state is not alive"
+
+  /** Return true if the response message is sent from a backup Master on standby. */
+  def responseFromBackup(msg: String): Boolean = {
+    msg.startsWith(BACKUP_STANDALONE_MASTER_PREFIX)
+  }
+
   /**
    * Adds a shutdown hook with default priority.
    *
diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
index 0a318a27ac212aaf9744220da02fd6f9bde0fd5f..f4d548d9e7720bdb690f8e9c23d17f3fbdb2508f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@@ -39,7 +39,6 @@ import org.apache.spark.deploy.master.DriverState._
  * Tests for the REST application submission protocol used in standalone cluster mode.
  */
 class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
-  private val client = new RestSubmissionClient
   private var actorSystem: Option[ActorSystem] = None
   private var server: Option[RestSubmissionServer] = None
 
@@ -52,7 +51,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     val appArgs = Array("one", "two", "three")
     val sparkProperties = Map("spark.app.name" -> "pi")
     val environmentVariables = Map("SPARK_ONE" -> "UN", "SPARK_TWO" -> "DEUX")
-    val request = client.constructSubmitRequest(
+    val request = new RestSubmissionClient("spark://host:port").constructSubmitRequest(
       "my-app-resource", "my-main-class", appArgs, sparkProperties, environmentVariables)
     assert(request.action === Utils.getFormattedClassName(request))
     assert(request.clientSparkVersion === SPARK_VERSION)
@@ -71,7 +70,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     val request = constructSubmitRequest(masterUrl, appArgs)
     assert(request.appArgs === appArgs)
     assert(request.sparkProperties("spark.master") === masterUrl)
-    val response = client.createSubmission(masterUrl, request)
+    val response = new RestSubmissionClient(masterUrl).createSubmission(request)
     val submitResponse = getSubmitResponse(response)
     assert(submitResponse.action === Utils.getFormattedClassName(submitResponse))
     assert(submitResponse.serverSparkVersion === SPARK_VERSION)
@@ -102,7 +101,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     val submissionId = "my-lyft-driver"
     val killMessage = "your driver is killed"
     val masterUrl = startDummyServer(killMessage = killMessage)
-    val response = client.killSubmission(masterUrl, submissionId)
+    val response = new RestSubmissionClient(masterUrl).killSubmission(submissionId)
     val killResponse = getKillResponse(response)
     assert(killResponse.action === Utils.getFormattedClassName(killResponse))
     assert(killResponse.serverSparkVersion === SPARK_VERSION)
@@ -116,7 +115,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     val submissionState = KILLED
     val submissionException = new Exception("there was an irresponsible mix of alcohol and cars")
     val masterUrl = startDummyServer(state = submissionState, exception = Some(submissionException))
-    val response = client.requestSubmissionStatus(masterUrl, submissionId)
+    val response = new RestSubmissionClient(masterUrl).requestSubmissionStatus(submissionId)
     val statusResponse = getStatusResponse(response)
     assert(statusResponse.action === Utils.getFormattedClassName(statusResponse))
     assert(statusResponse.serverSparkVersion === SPARK_VERSION)
@@ -129,13 +128,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
   test("create then kill") {
     val masterUrl = startSmartServer()
     val request = constructSubmitRequest(masterUrl)
-    val response1 = client.createSubmission(masterUrl, request)
+    val client = new RestSubmissionClient(masterUrl)
+    val response1 = client.createSubmission(request)
     val submitResponse = getSubmitResponse(response1)
     assert(submitResponse.success)
     assert(submitResponse.submissionId != null)
     // kill submission that was just created
     val submissionId = submitResponse.submissionId
-    val response2 = client.killSubmission(masterUrl, submissionId)
+    val response2 = client.killSubmission(submissionId)
     val killResponse = getKillResponse(response2)
     assert(killResponse.success)
     assert(killResponse.submissionId === submissionId)
@@ -144,13 +144,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
   test("create then request status") {
     val masterUrl = startSmartServer()
     val request = constructSubmitRequest(masterUrl)
-    val response1 = client.createSubmission(masterUrl, request)
+    val client = new RestSubmissionClient(masterUrl)
+    val response1 = client.createSubmission(request)
     val submitResponse = getSubmitResponse(response1)
     assert(submitResponse.success)
     assert(submitResponse.submissionId != null)
     // request status of submission that was just created
     val submissionId = submitResponse.submissionId
-    val response2 = client.requestSubmissionStatus(masterUrl, submissionId)
+    val response2 = client.requestSubmissionStatus(submissionId)
     val statusResponse = getStatusResponse(response2)
     assert(statusResponse.success)
     assert(statusResponse.submissionId === submissionId)
@@ -160,8 +161,9 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
   test("create then kill then request status") {
     val masterUrl = startSmartServer()
     val request = constructSubmitRequest(masterUrl)
-    val response1 = client.createSubmission(masterUrl, request)
-    val response2 = client.createSubmission(masterUrl, request)
+    val client = new RestSubmissionClient(masterUrl)
+    val response1 = client.createSubmission(request)
+    val response2 = client.createSubmission(request)
     val submitResponse1 = getSubmitResponse(response1)
     val submitResponse2 = getSubmitResponse(response2)
     assert(submitResponse1.success)
@@ -171,13 +173,13 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
     val submissionId1 = submitResponse1.submissionId
     val submissionId2 = submitResponse2.submissionId
     // kill only submission 1, but not submission 2
-    val response3 = client.killSubmission(masterUrl, submissionId1)
+    val response3 = client.killSubmission(submissionId1)
     val killResponse = getKillResponse(response3)
     assert(killResponse.success)
     assert(killResponse.submissionId === submissionId1)
     // request status for both submissions: 1 should be KILLED but 2 should be RUNNING still
-    val response4 = client.requestSubmissionStatus(masterUrl, submissionId1)
-    val response5 = client.requestSubmissionStatus(masterUrl, submissionId2)
+    val response4 = client.requestSubmissionStatus(submissionId1)
+    val response5 = client.requestSubmissionStatus(submissionId2)
     val statusResponse1 = getStatusResponse(response4)
     val statusResponse2 = getStatusResponse(response5)
     assert(statusResponse1.submissionId === submissionId1)
@@ -189,13 +191,14 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
   test("kill or request status before create") {
     val masterUrl = startSmartServer()
     val doesNotExist = "does-not-exist"
+    val client = new RestSubmissionClient(masterUrl)
     // kill a non-existent submission
-    val response1 = client.killSubmission(masterUrl, doesNotExist)
+    val response1 = client.killSubmission(doesNotExist)
     val killResponse = getKillResponse(response1)
     assert(!killResponse.success)
     assert(killResponse.submissionId === doesNotExist)
     // request status for a non-existent submission
-    val response2 = client.requestSubmissionStatus(masterUrl, doesNotExist)
+    val response2 = client.requestSubmissionStatus(doesNotExist)
     val statusResponse = getStatusResponse(response2)
     assert(!statusResponse.success)
     assert(statusResponse.submissionId === doesNotExist)
@@ -339,6 +342,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
 
   test("client handles faulty server") {
     val masterUrl = startFaultyServer()
+    val client = new RestSubmissionClient(masterUrl)
     val httpUrl = masterUrl.replace("spark://", "http://")
     val v = RestSubmissionServer.PROTOCOL_VERSION
     val submitRequestPath = s"$httpUrl/$v/submissions/create"
@@ -425,7 +429,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
       mainJar) ++ appArgs
     val args = new SparkSubmitArguments(commandLineArgs)
     val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args)
-    client.constructSubmitRequest(
+    new RestSubmissionClient("spark://host:port").constructSubmitRequest(
       mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty)
   }
 
@@ -492,7 +496,7 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
       method: String,
       body: String = ""): (SubmitRestProtocolResponse, Int) = {
     val conn = sendHttpRequest(url, method, body)
-    (client.readResponse(conn), conn.getResponseCode)
+    (new RestSubmissionClient("spark://host:port").readResponse(conn), conn.getResponseCode)
   }
 }