Skip to content
Snippets Groups Projects
Commit dabeb6f1 authored by Aaron Davidson's avatar Aaron Davidson
Browse files

SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1

This patch allows the FaultToleranceTest to work in newer versions of Docker.
See for more details.

Besides changing the Docker and FaultToleranceTest internals, this patch also changes the behavior of Master to accept new Workers which share an address with a Worker that we are currently trying to recover. This can only happen when the Worker itself was restarted and got the same IP address/port at the same time as a Master recovery occurs.

Finally, this adds a good bit of ASCII art to the test to make failures, successes, and actions more apparent. This is very much needed.

Author: Aaron Davidson <>

Closes #5 from aarondav/zookeeper and squashes the following commits:

5d7a72a [Aaron Davidson] SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
parent 33baf14b
No related branches found
No related tags found
No related merge requests found
......@@ -30,20 +30,24 @@ import scala.sys.process._
import org.json4s._
import org.json4s.jackson.JsonMethods
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
* In order to mimic a real distributed cluster more closely, Docker is used.
* Execute using
* ./spark-class org.apache.spark.deploy.FaultToleranceTest
* ./bin/spark-class org.apache.spark.deploy.FaultToleranceTest
* Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS:
* Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
* - spark.deploy.recoveryMode=ZOOKEEPER
* - spark.deploy.zookeeper.url=
* Note that is the default docker ip for the host and 2181 is the default ZK port.
* In case of failure, make sure to kill off prior docker containers before restarting:
* docker kill $(docker ps -q)
* Unfortunately, due to the Docker dependency this suite cannot be run automatically without a
* working installation of Docker. In addition to having Docker, the following are assumed:
* - Docker can run without sudo (see
......@@ -51,10 +55,16 @@ import org.apache.spark.deploy.master.RecoveryState
* docker/ directory. Run 'docker/spark-test/build' to generate these.
private[spark] object FaultToleranceTest extends App with Logging {
val conf = new SparkConf()
val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")
val masters = ListBuffer[TestMasterInfo]()
val workers = ListBuffer[TestWorkerInfo]()
var sc: SparkContext = _
val zk = SparkCuratorUtil.newClient(conf)
var numPassed = 0
var numFailed = 0
......@@ -72,6 +82,10 @@ private[spark] object FaultToleranceTest extends App with Logging {
sc = null
// Clear ZK directories in between tests (for speed purposes)
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/spark_leader")
SparkCuratorUtil.deleteRecursive(zk, ZK_DIR + "/master_status")
test("sanity-basic") {
......@@ -168,26 +182,34 @@ private[spark] object FaultToleranceTest extends App with Logging {
try {
numPassed += 1
logInfo("Passed: " + name)
} catch {
case e: Exception =>
numFailed += 1
logError("FAILED: " + name, e)
def addMasters(num: Int) {
logInfo(s">>>>> ADD MASTERS $num <<<<<")
(1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
def addWorkers(num: Int) {
logInfo(s">>>>> ADD WORKERS $num <<<<<")
val masterUrls = getMasterUrls(masters)
(1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = {
logInfo(">>>>> CREATE CLIENT <<<<<")
if (sc != null) { sc.stop() }
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
......@@ -206,6 +228,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
def killLeader(): Unit = {
logInfo(">>>>> KILL LEADER <<<<<")
val leader = getLeader
masters -= leader
......@@ -215,6 +238,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)
def terminateCluster() {
logInfo(">>>>> TERMINATE CLUSTER <<<<<")
......@@ -245,6 +269,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
* are all alive in a proper configuration (e.g., only one leader).
def assertValidClusterState() = {
logInfo(">>>>> ASSERT VALID CLUSTER STATE <<<<<")
var numAlive = 0
var numStandby = 0
......@@ -326,7 +351,11 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val
val workers = json \ "workers"
val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE")
liveWorkerIPs = => (w \ "host").extract[String])
// Extract the worker IP from "webuiaddress" (rather than "host") because the host name
// on containers is a weird hash instead of the actual IP address.
liveWorkerIPs = {
w => (w \ "webuiaddress").extract[String].stripPrefix("http://").stripSuffix(":8081")
numLiveApps = (json \ "activeapps").children.size
......@@ -403,7 +432,7 @@ private[spark] object Docker extends Logging {
def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = {
val mountCmd = if (mountDir != "") { " -v " + mountDir } else ""
val cmd = "docker run %s %s %s".format(mountCmd, imageTag, args)
val cmd = "docker run -privileged %s %s %s".format(mountCmd, imageTag, args)
logDebug("Run command: " + cmd)
......@@ -531,8 +531,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int,
val workerAddress =
if (addressToWorker.contains(workerAddress)) {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
val oldWorker = addressToWorker(workerAddress)
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
workers += worker
......@@ -17,11 +17,13 @@
package org.apache.spark.deploy.master
import org.apache.spark.{SparkConf, Logging}
import scala.collection.JavaConversions._
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.zookeeper.KeeperException
import org.apache.spark.{Logging, SparkConf}
object SparkCuratorUtil extends Logging {
......@@ -50,4 +52,13 @@ object SparkCuratorUtil extends Logging {
def deleteRecursive(zk: CuratorFramework, path: String) {
if (zk.checkExists().forPath(path) != null) {
for (child <- zk.getChildren.forPath(path)) {
zk.delete().forPath(path + "/" + child)
......@@ -2,4 +2,6 @@ Spark docker files
Drawn from Matt Massie's docker files (,
as well as some updates from Andre Schumacher (
\ No newline at end of file
as well as some updates from Andre Schumacher (
Tested with Docker version 0.8.1.
......@@ -19,4 +19,10 @@
IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
/opt/spark/spark-class org.apache.spark.deploy.master.Master -i $IP
# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
umount /etc/hosts
/opt/spark/bin/spark-class org.apache.spark.deploy.master.Master -i $IP
......@@ -19,4 +19,10 @@
IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
/opt/spark/spark-class org.apache.spark.deploy.worker.Worker $1
# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
umount /etc/hosts
/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker $1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment