Skip to content
Snippets Groups Projects
Commit 1b2ab1cd authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-4592] Avoid duplicate worker registrations in standalone mode

**Summary.** On failover, the Master may receive duplicate registrations from the same worker, causing the worker to exit. This is caused by this commit https://github.com/apache/spark/commit/4afe9a4852ebeb4cc77322a14225cd3dec165f3f, which adds logic for the worker to re-register with the master in case of failures. However, the following race condition may occur:

(1) Master A fails and Worker attempts to reconnect to all masters
(2) Master B takes over and notifies Worker
(3) Worker responds by registering with Master B
(4) Meanwhile, Worker's previous reconnection attempt reaches Master B, causing the same Worker to register with Master B twice

**Fix.** Instead of attempting to register with all known masters, the worker should re-register with only the one that it has been communicating with. This is safe because the fact that a failover has occurred means the old master must have died. Then, when the worker is finally notified of a new master, it gives up on the old one in favor of the new one.

**Caveat.** Even this fix is subject to more obscure race conditions. For instance, if Master B fails and Master A recovers immediately, then Master A may still observe duplicate worker registrations. However, this and other potential race conditions summarized in [SPARK-4592](https://issues.apache.org/jira/browse/SPARK-4592), are much, much less likely than the one described above, which is deterministically reproducible.

Author: Andrew Or <andrew@databricks.com>

Closes #3447 from andrewor14/standalone-failover and squashes the following commits:

0d9716c [Andrew Or] Move re-registration logic to actor for thread-safety
79286dc [Andrew Or] Preserve old behavior for initial retries
83b321c [Andrew Or] Tweak wording
1fce6a9 [Andrew Or] Active master actor could be null in the beginning
b6f269e [Andrew Or] Avoid duplicate worker registrations
parent 8838ad7c
No related branches found
No related tags found
No related merge requests found
...@@ -92,6 +92,8 @@ private[deploy] object DeployMessages { ...@@ -92,6 +92,8 @@ private[deploy] object DeployMessages {
case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
case object ReregisterWithMaster // used when a worker attempts to reconnect to a master
// AppClient to Master // AppClient to Master
case class RegisterApplication(appDescription: ApplicationDescription) case class RegisterApplication(appDescription: ApplicationDescription)
......
...@@ -21,7 +21,6 @@ import java.io.File ...@@ -21,7 +21,6 @@ import java.io.File
import java.io.IOException import java.io.IOException
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.{UUID, Date} import java.util.{UUID, Date}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
...@@ -177,6 +176,9 @@ private[spark] class Worker( ...@@ -177,6 +176,9 @@ private[spark] class Worker(
throw new SparkException("Invalid spark URL: " + x) throw new SparkException("Invalid spark URL: " + x)
} }
connected = true connected = true
// Cancel any outstanding re-registration attempts because we found a new master
registrationRetryTimer.foreach(_.cancel())
registrationRetryTimer = None
} }
private def tryRegisterAllMasters() { private def tryRegisterAllMasters() {
...@@ -187,7 +189,12 @@ private[spark] class Worker( ...@@ -187,7 +189,12 @@ private[spark] class Worker(
} }
} }
private def retryConnectToMaster() { /**
* Re-register with the master because a network failure or a master failure has occurred.
* If the re-registration attempt threshold is exceeded, the worker exits with error.
* Note that for thread-safety this should only be called from the actor.
*/
private def reregisterWithMaster(): Unit = {
Utils.tryOrExit { Utils.tryOrExit {
connectionAttemptCount += 1 connectionAttemptCount += 1
if (registered) { if (registered) {
...@@ -195,12 +202,40 @@ private[spark] class Worker( ...@@ -195,12 +202,40 @@ private[spark] class Worker(
registrationRetryTimer = None registrationRetryTimer = None
} else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)") logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
tryRegisterAllMasters() /**
* Re-register with the active master this worker has been communicating with. If there
* is none, then it means this worker is still bootstrapping and hasn't established a
* connection with a master yet, in which case we should re-register with all masters.
*
* It is important to re-register only with the active master during failures. Otherwise,
* if the worker unconditionally attempts to re-register with all masters, the following
* race condition may arise and cause a "duplicate worker" error detailed in SPARK-4592:
*
* (1) Master A fails and Worker attempts to reconnect to all masters
* (2) Master B takes over and notifies Worker
* (3) Worker responds by registering with Master B
* (4) Meanwhile, Worker's previous reconnection attempt reaches Master B,
* causing the same Worker to register with Master B twice
*
* Instead, if we only register with the known active master, we can assume that the
* old master must have died because another master has taken over. Note that this is
* still not safe if the old master recovers within this interval, but this is a much
* less likely scenario.
*/
if (master != null) {
master ! RegisterWorker(
workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
} else {
// We are retrying the initial registration
tryRegisterAllMasters()
}
// We have exceeded the initial registration retry threshold
// All retries from now on should use a higher interval
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) { if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel()) registrationRetryTimer.foreach(_.cancel())
registrationRetryTimer = Some { registrationRetryTimer = Some {
context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL, context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL,
PROLONGED_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) PROLONGED_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
} }
} }
} else { } else {
...@@ -220,7 +255,7 @@ private[spark] class Worker( ...@@ -220,7 +255,7 @@ private[spark] class Worker(
connectionAttemptCount = 0 connectionAttemptCount = 0
registrationRetryTimer = Some { registrationRetryTimer = Some {
context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL, context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
} }
case Some(_) => case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" + logInfo("Not spawning another attempt to register with the master, since there is an" +
...@@ -400,12 +435,15 @@ private[spark] class Worker( ...@@ -400,12 +435,15 @@ private[spark] class Worker(
logInfo(s"$x Disassociated !") logInfo(s"$x Disassociated !")
masterDisconnected() masterDisconnected()
case RequestWorkerState => { case RequestWorkerState =>
sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
finishedExecutors.values.toList, drivers.values.toList, finishedExecutors.values.toList, drivers.values.toList,
finishedDrivers.values.toList, activeMasterUrl, cores, memory, finishedDrivers.values.toList, activeMasterUrl, cores, memory,
coresUsed, memoryUsed, activeMasterWebUiUrl) coresUsed, memoryUsed, activeMasterWebUiUrl)
}
case ReregisterWithMaster =>
reregisterWithMaster()
} }
private def masterDisconnected() { private def masterDisconnected() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment