Skip to content
Snippets Groups Projects
Commit c42ef953 authored by Susan X. Huynh's avatar Susan X. Huynh Committed by Marcelo Vanzin
Browse files

[SPARK-21456][MESOS] Make the driver failover_timeout configurable

## What changes were proposed in this pull request?

Current behavior: in Mesos cluster mode, the driver failover_timeout is set to zero. If the driver temporarily loses connectivity with the Mesos master, the framework will be torn down and all executors killed.

Proposed change: make the failover_timeout configurable via a new option, spark.mesos.driver.failoverTimeout. The default value is still zero.

Note: with non-zero failover_timeout, an explicit teardown is needed in some cases. This is captured in https://issues.apache.org/jira/browse/SPARK-21458

## How was this patch tested?

Added a unit test to make sure the config option is set while creating the scheduler driver.

Ran an integration test with mesosphere/spark showing that with a non-zero failover_timeout the Spark job finishes after a driver is disconnected from the master.

Author: Susan X. Huynh <xhuynh@mesosphere.com>

Closes #18674 from susanxhuynh/sh-mesos-failover-timeout.
parent c9729187
No related branches found
No related tags found
No related merge requests found
...@@ -545,6 +545,17 @@ See the [configuration page](configuration.html) for information on Spark config ...@@ -545,6 +545,17 @@ See the [configuration page](configuration.html) for information on Spark config
Fetcher Cache</a> Fetcher Cache</a>
</td> </td>
</tr> </tr>
<tr>
<td><code>spark.mesos.driver.failoverTimeout</code></td>
<td><code>0.0</code></td>
<td>
The amount of time (in seconds) that the master will wait for the
driver to reconnect, after being temporarily disconnected, before
it tears down the driver framework by killing all its
executors. The default value is zero, meaning no timeout: if the
driver disconnects, the master immediately tears down the framework.
</td>
</tr>
</table> </table>
# Troubleshooting and Debugging # Troubleshooting and Debugging
......
...@@ -58,9 +58,16 @@ package object config { ...@@ -58,9 +58,16 @@ package object config {
private [spark] val DRIVER_LABELS = private [spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels") ConfigBuilder("spark.mesos.driver.labels")
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" + .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
"pairs should be separated by a colon, and commas used to list more than one." + "pairs should be separated by a colon, and commas used to list more than one." +
"Ex. key:value,key2:value2") "Ex. key:value,key2:value2")
.stringConf .stringConf
.createOptional .createOptional
private [spark] val DRIVER_FAILOVER_TIMEOUT =
ConfigBuilder("spark.mesos.driver.failoverTimeout")
.doc("Amount of time in seconds that the master will wait to hear from the driver, " +
"during a temporary disconnection, before tearing down all the executors.")
.doubleConf
.createWithDefault(0.0)
} }
...@@ -29,6 +29,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} ...@@ -29,6 +29,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver import org.apache.mesos.SchedulerDriver
import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config import org.apache.spark.internal.config
import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
...@@ -177,7 +178,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ...@@ -177,7 +178,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf, sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None, None,
None, Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
sc.conf.getOption("spark.mesos.driver.frameworkId") sc.conf.getOption("spark.mesos.driver.frameworkId")
) )
......
...@@ -33,6 +33,7 @@ import org.scalatest.mock.MockitoSugar ...@@ -33,6 +33,7 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
...@@ -369,6 +370,41 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite ...@@ -369,6 +370,41 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start() backend.start()
} }
test("failover timeout is set in created scheduler driver") {
val failoverTimeoutIn = 3600.0
initializeSparkConf(Map(DRIVER_FAILOVER_TIMEOUT.key -> failoverTimeoutIn.toString))
sc = new SparkContext(sparkConf)
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
val securityManager = mock[SecurityManager]
val backend = new MesosCoarseGrainedSchedulerBackend(
taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(failoverTimeout.isDefined)
assert(failoverTimeout.get.equals(failoverTimeoutIn))
driver
}
}
backend.start()
}
test("honors unset spark.mesos.containerizer") { test("honors unset spark.mesos.containerizer") {
setBackend(Map("spark.mesos.executor.docker.image" -> "test")) setBackend(Map("spark.mesos.executor.docker.image" -> "test"))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment