From 82197ed3bd4b8c29b0c4b183994753f0e02b6903 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Date: Wed, 18 Feb 2015 12:20:11 +0000 Subject: [PATCH] [SPARK-4949]shutdownCallback in SparkDeploySchedulerBackend should be enclosed by synchronized block. A variable `shutdownCallback` in SparkDeploySchedulerBackend can be accessed from multiple threads so it should be enclosed by synchronized block. Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #3781 from sarutak/SPARK-4949 and squashes the following commits: c146c93 [Kousuke Saruta] Removed "setShutdownCallback" method c7265dc [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-4949 42ca528 [Kousuke Saruta] Changed the declaration of the variable "shutdownCallback" as a volatile reference instead of AtomicReference 552df7c [Kousuke Saruta] Changed the declaration of the variable "shutdownCallback" as a volatile reference instead of AtomicReference f556819 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-4949 1b60fd1 [Kousuke Saruta] Improved the locking logics 5942765 [Kousuke Saruta] Enclosed shutdownCallback in SparkDeploySchedulerBackend by synchronized block --- .../cluster/SparkDeploySchedulerBackend.scala | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 40fc6b59cd..a0aa555f62 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler.cluster +import java.util.concurrent.Semaphore + import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} @@ -31,16 +33,16 @@ private[spark] class SparkDeploySchedulerBackend( with AppClientListener with Logging { - var client: AppClient = null - var stopping = false - var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ - @volatile var appId: String = _ + private var client: AppClient = null + private var stopping = false + + @volatile var shutdownCallback: SparkDeploySchedulerBackend => Unit = _ + @volatile private var appId: String = _ - val registrationLock = new Object() - var registrationDone = false + private val registrationBarrier = new Semaphore(0) - val maxCores = conf.getOption("spark.cores.max").map(_.toInt) - val totalExpectedCores = maxCores.getOrElse(0) + private val maxCores = conf.getOption("spark.cores.max").map(_.toInt) + private val totalExpectedCores = maxCores.getOrElse(0) override def start() { super.start() @@ -95,8 +97,10 @@ private[spark] class SparkDeploySchedulerBackend( stopping = true super.stop() client.stop() - if (shutdownCallback != null) { - shutdownCallback(this) + + val callback = shutdownCallback + if (callback != null) { + callback(this) } } @@ -149,18 +153,11 @@ private[spark] class SparkDeploySchedulerBackend( } private def waitForRegistration() = { - registrationLock.synchronized { - while (!registrationDone) { - registrationLock.wait() - } - } + registrationBarrier.acquire() } private def notifyContext() = { - registrationLock.synchronized { - registrationDone = true - registrationLock.notifyAll() - } + registrationBarrier.release() } } -- GitLab