diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index e8a1e35c3fc4834f97f48818ad6e64b54f38ee50..7fc96e4f764b7b969cdfbc37a87127868631ce1e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -28,7 +28,7 @@ import org.apache.spark.network.sasl.SaslServerBootstrap import org.apache.spark.network.server.{TransportServerBootstrap, TransportServer} import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.network.util.TransportConf -import org.apache.spark.util.Utils +import org.apache.spark.util.{ShutdownHookManager, Utils} /** * Provides a server from which Executors can read shuffle files (rather than reading directly from @@ -118,19 +118,13 @@ object ExternalShuffleService extends Logging { server = newShuffleService(sparkConf, securityManager) server.start() - installShutdownHook() + ShutdownHookManager.addShutdownHook { () => + logInfo("Shutting down shuffle service.") + server.stop() + barrier.countDown() + } // keep running until the process is terminated barrier.await() } - - private def installShutdownHook(): Unit = { - Runtime.getRuntime.addShutdownHook(new Thread("External Shuffle Service shutdown thread") { - override def run() { - logInfo("Shutting down shuffle service.") - server.stop() - barrier.countDown() - } - }) - } } diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala index 5d4e5b899dfdcbbb78804d18e3c4480069ce7519..389eff5e0645b051c37fcb5e1d0f5f6624f0aab6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala @@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch import org.apache.spark.deploy.mesos.ui.MesosClusterUI import org.apache.spark.deploy.rest.mesos.MesosRestServer import org.apache.spark.scheduler.cluster.mesos._ -import org.apache.spark.util.SignalLogger +import org.apache.spark.util.{ShutdownHookManager, SignalLogger} import org.apache.spark.{Logging, SecurityManager, SparkConf} /* @@ -103,14 +103,11 @@ private[mesos] object MesosClusterDispatcher extends Logging { } val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf) dispatcher.start() - val shutdownHook = new Thread() { - override def run() { - logInfo("Shutdown hook is shutting down dispatcher") - dispatcher.stop() - dispatcher.awaitShutdown() - } + ShutdownHookManager.addShutdownHook { () => + logInfo("Shutdown hook is shutting down dispatcher") + dispatcher.stop() + dispatcher.awaitShutdown() } - Runtime.getRuntime.addShutdownHook(shutdownHook) dispatcher.awaitShutdown() } } diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index 620f226a23e154add62756b48db1689233709a43..1a0f3b477ba3f01cf41fe3d14e48a508755ccf86 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -162,7 +162,9 @@ private[spark] object ShutdownHookManager extends Logging { val hook = new Thread { override def run() {} } + // scalastyle:off runtimeaddshutdownhook Runtime.getRuntime.addShutdownHook(hook) + // scalastyle:on runtimeaddshutdownhook Runtime.getRuntime.removeShutdownHook(hook) } catch { case ise: IllegalStateException => return true @@ -228,7 +230,9 @@ private [util] class SparkShutdownHookManager { .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30)) case Failure(_) => + // scalastyle:off runtimeaddshutdownhook Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook")); + // scalastyle:on runtimeaddshutdownhook } } diff --git a/scalastyle-config.xml b/scalastyle-config.xml index dab1ebddc666e396d23a24ac905e8107ca8b648f..6925e18737b752260e94cb841710a086dd2e2f75 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -157,6 +157,18 @@ This file is divided into 3 sections: ]]></customMessage> </check> + <check customId="runtimeaddshutdownhook" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters><parameter name="regex">Runtime\.getRuntime\.addShutdownHook</parameter></parameters> + <customMessage><![CDATA[ + Are you sure that you want to use Runtime.getRuntime.addShutdownHook? In most cases, you should use + ShutdownHookManager.addShutdownHook instead. + If you must use Runtime.getRuntime.addShutdownHook, wrap the code block with + // scalastyle:off runtimeaddshutdownhook + Runtime.getRuntime.addShutdownHook(...) + // scalastyle:on runtimeaddshutdownhook + ]]></customMessage> + </check> + <check customId="classforname" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> <parameters><parameter name="regex">Class\.forName</parameter></parameters> <customMessage><![CDATA[ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 03bb2c222503f817b15247a461f38bb9cbfc7d52..8e7aa75bc3b2cc1fb14b626e6d4539156c95bad3 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -195,20 +195,18 @@ private[hive] object SparkSQLCLIDriver extends Logging { } // add shutdown hook to flush the history to history file - Runtime.getRuntime.addShutdownHook(new Thread(new Runnable() { - override def run() = { - reader.getHistory match { - case h: FileHistory => - try { - h.flush() - } catch { - case e: IOException => - logWarning("WARNING: Failed to write command history file: " + e.getMessage) - } - case _ => - } + ShutdownHookManager.addShutdownHook { () => + reader.getHistory match { + case h: FileHistory => + try { + h.flush() + } catch { + case e: IOException => + logWarning("WARNING: Failed to write command history file: " + e.getMessage) + } + case _ => } - })) + } // TODO: missing /*