diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index b9dd8557ee904a0e7e3108c0f2350d7e8d3dfd35..c46f84de8444a14854ab381fa8d021d5dffd7afd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -92,6 +92,8 @@ private[deploy] object DeployMessages { case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders + case object ReregisterWithMaster // used when a worker attempts to reconnect to a master + // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ca262de832e25d77a49a58b3b78666b5ae089c45..eb11163538b20a81a9a5602029f026dc55e7fe36 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -21,7 +21,6 @@ import java.io.File import java.io.IOException import java.text.SimpleDateFormat import java.util.{UUID, Date} -import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap @@ -177,6 +176,9 @@ private[spark] class Worker( throw new SparkException("Invalid spark URL: " + x) } connected = true + // Cancel any outstanding re-registration attempts because we found a new master + registrationRetryTimer.foreach(_.cancel()) + registrationRetryTimer = None } private def tryRegisterAllMasters() { @@ -187,7 +189,12 @@ private[spark] class Worker( } } - private def retryConnectToMaster() { + /** + * Re-register with the master because a network failure or a master failure has occurred. + * If the re-registration attempt threshold is exceeded, the worker exits with error. + * Note that for thread-safety this should only be called from the actor. + */ + private def reregisterWithMaster(): Unit = { Utils.tryOrExit { connectionAttemptCount += 1 if (registered) { @@ -195,12 +202,40 @@ private[spark] class Worker( registrationRetryTimer = None } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)") - tryRegisterAllMasters() + /** + * Re-register with the active master this worker has been communicating with. If there + * is none, then it means this worker is still bootstrapping and hasn't established a + * connection with a master yet, in which case we should re-register with all masters. + * + * It is important to re-register only with the active master during failures. Otherwise, + * if the worker unconditionally attempts to re-register with all masters, the following + * race condition may arise and cause a "duplicate worker" error detailed in SPARK-4592: + * + * (1) Master A fails and Worker attempts to reconnect to all masters + * (2) Master B takes over and notifies Worker + * (3) Worker responds by registering with Master B + * (4) Meanwhile, Worker's previous reconnection attempt reaches Master B, + * causing the same Worker to register with Master B twice + * + * Instead, if we only register with the known active master, we can assume that the + * old master must have died because another master has taken over. Note that this is + * still not safe if the old master recovers within this interval, but this is a much + * less likely scenario. + */ + if (master != null) { + master ! RegisterWorker( + workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + } else { + // We are retrying the initial registration + tryRegisterAllMasters() + } + // We have exceeded the initial registration retry threshold + // All retries from now on should use a higher interval if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) { registrationRetryTimer.foreach(_.cancel()) registrationRetryTimer = Some { context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL, - PROLONGED_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) + PROLONGED_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster) } } } else { @@ -220,7 +255,7 @@ private[spark] class Worker( connectionAttemptCount = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL, - INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) + INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster) } case Some(_) => logInfo("Not spawning another attempt to register with the master, since there is an" + @@ -400,12 +435,15 @@ private[spark] class Worker( logInfo(s"$x Disassociated !") masterDisconnected() - case RequestWorkerState => { + case RequestWorkerState => sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, finishedExecutors.values.toList, drivers.values.toList, finishedDrivers.values.toList, activeMasterUrl, cores, memory, coresUsed, memoryUsed, activeMasterWebUiUrl) - } + + case ReregisterWithMaster => + reregisterWithMaster() + } private def masterDisconnected() {