diff --git a/core/src/main/scala/org/apache/spark/util/SignalLogger.scala b/core/src/main/scala/org/apache/spark/util/SignalLogger.scala deleted file mode 100644 index a793c9135eea952696c9eff7b289dfc9a14cb9c5..0000000000000000000000000000000000000000 --- a/core/src/main/scala/org/apache/spark/util/SignalLogger.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import org.slf4j.Logger - -/** - * Used to log signals received. This can be very useful in debugging crashes or kills. - */ -private[spark] object SignalLogger { - - private var registered = false - - /** Register a signal handler to log signals on UNIX-like systems. */ - def register(log: Logger): Unit = Seq("TERM", "HUP", "INT").foreach{ sig => - Signaling.register(sig) { - log.error("RECEIVED SIGNAL " + sig) - false - } - } -} diff --git a/core/src/main/scala/org/apache/spark/util/Signaling.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala similarity index 59% rename from core/src/main/scala/org/apache/spark/util/Signaling.scala rename to core/src/main/scala/org/apache/spark/util/SignalUtils.scala index 2075cc45a9b8de396cbaae4a21f5d845a36de3ad..9479d8f74dd2530f345328225d7e8c771f6837a8 100644 --- a/core/src/main/scala/org/apache/spark/util/Signaling.scala +++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala @@ -17,28 +17,69 @@ package org.apache.spark.util -import java.util.{Collections, LinkedList} +import java.util.Collections import scala.collection.JavaConverters._ -import scala.collection.mutable.HashMap import org.apache.commons.lang3.SystemUtils +import org.slf4j.Logger import sun.misc.{Signal, SignalHandler} import org.apache.spark.internal.Logging - /** * Contains utilities for working with posix signals. */ -private[spark] object Signaling extends Logging { +private[spark] object SignalUtils extends Logging { + + /** A flag to make sure we only register the logger once. */ + private var loggerRegistered = false + + /** Register a signal handler to log signals on UNIX-like systems. */ + def registerLogger(log: Logger): Unit = synchronized { + if (!loggerRegistered) { + Seq("TERM", "HUP", "INT").foreach { sig => + SignalUtils.register(sig) { + log.error("RECEIVED SIGNAL " + sig) + false + } + } + loggerRegistered = true + } + } + + /** + * Adds an action to be run when a given signal is received by this process. + * + * Note that signals are only supported on unix-like operating systems and work on a best-effort + * basis: if a signal is not available or cannot be intercepted, only a warning is emitted. + * + * All actions for a given signal are run in a separate thread. + */ + def register(signal: String)(action: => Boolean): Unit = synchronized { + if (SystemUtils.IS_OS_UNIX) { + try { + val handler = handlers.getOrElseUpdate(signal, { + logInfo("Registered signal handler for " + signal) + new ActionHandler(new Signal(signal)) + }) + handler.register(action) + } catch { + case ex: Exception => logWarning(s"Failed to register signal handler for " + signal, ex) + } + } + } /** * A handler for the given signal that runs a collection of actions. */ private class ActionHandler(signal: Signal) extends SignalHandler { - private val actions = Collections.synchronizedList(new LinkedList[() => Boolean]) + /** + * List of actions upon the signal; the callbacks should return true if the signal is "handled", + * i.e. should not escalate to the next callback. + */ + private val actions = Collections.synchronizedList(new java.util.LinkedList[() => Boolean]) // original signal handler, before this handler was attached private val prevHandler: SignalHandler = Signal.handle(signal, this) @@ -51,11 +92,10 @@ private[spark] object Signaling extends Logging { // register old handler, will receive incoming signals while this handler is running Signal.handle(signal, prevHandler) - val escalate = actions.asScala forall { action => - !action() - } - - if(escalate) { + // run all actions, escalate to parent handler if no action catches the signal + // (i.e. all actions return false) + val escalate = actions.asScala.forall { action => !action() } + if (escalate) { prevHandler.handle(sig) } @@ -64,36 +104,13 @@ private[spark] object Signaling extends Logging { } /** - * Add an action to be run by this handler. + * Adds an action to be run by this handler. * @param action An action to be run when a signal is received. Return true if the signal - * should be stopped with this handler, false if it should be escalated. + * should be stopped with this handler, false if it should be escalated. */ def register(action: => Boolean): Unit = actions.add(() => action) - - } - - // contains association of signals to their respective handlers - private val handlers = new HashMap[String, ActionHandler] - - /** - * Adds an action to be run when a given signal is received by this process. - * - * Note that signals are only supported on unix-like operating systems and work on a best-effort - * basis: if a signal is not available or cannot be intercepted, only a warning is emitted. - * - * All actions for a given signal are run in a separate thread. - */ - def register(signal: String)(action: => Boolean): Unit = synchronized { - if (SystemUtils.IS_OS_UNIX) try { - val handler = handlers.getOrElseUpdate(signal, { - val h = new ActionHandler(new Signal(signal)) - logInfo("Registered signal handler for " + signal) - h - }) - handler.register(action) - } catch { - case ex: Exception => logWarning(s"Failed to register signal handler for " + signal, ex) - } } + /** Mapping from signal to their respective handlers. */ + private val handlers = new scala.collection.mutable.HashMap[String, ActionHandler] } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 848f7d7adbc7ef1cccf6c5c418e4ffe64a566ae0..ea49991493fd789e7bbf4e4980717489e439f19f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2284,7 +2284,7 @@ private[spark] object Utils extends Logging { */ def initDaemon(log: Logger): Unit = { log.info(s"Started daemon with process name: ${Utils.getProcessName()}") - SignalLogger.register(log) + SignalUtils.registerLogger(log) } } diff --git a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala index c305ed545c4c95944a758e6018fb04ecefb53a19..202febf144626359ef149a1ca8585c11724b1570 100644 --- a/repl/src/main/scala/org/apache/spark/repl/Signaling.scala +++ b/repl/src/main/scala/org/apache/spark/repl/Signaling.scala @@ -19,7 +19,7 @@ package org.apache.spark.repl import org.apache.spark.SparkContext import org.apache.spark.internal.Logging -import org.apache.spark.util.{Signaling => USignaling} +import org.apache.spark.util.SignalUtils private[repl] object Signaling extends Logging { @@ -28,7 +28,7 @@ private[repl] object Signaling extends Logging { * when no jobs are currently running. * This makes it possible to interrupt a running shell job by pressing Ctrl+C. */ - def cancelOnInterrupt(ctx: SparkContext): Unit = USignaling.register("INT") { + def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") { if (!ctx.statusTracker.getActiveJobIds().isEmpty) { logWarning("Cancelling all active jobs, this can take a while. " + "Press Ctrl+C again to exit now.") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 5bb63500c8f88e42a8b9680ec486c622bd402267..4df90d7b6b0b8cf06c747e79671458a63799c7ed 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -716,7 +716,7 @@ object ApplicationMaster extends Logging { private var master: ApplicationMaster = _ def main(args: Array[String]): Unit = { - SignalLogger.register(log) + SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) SparkHadoopUtil.get.runAsSparkUser { () => master = new ApplicationMaster(amArgs, new YarnRMClient)