diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ea4ddcc2e265d76b3454eb4a1e969d19c1900e27..65b903a55d5bd80db942f47f532a78443457f9c4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -223,6 +223,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   private var _listenerBusStarted: Boolean = false
   private var _jars: Seq[String] = _
   private var _files: Seq[String] = _
+  private var _shutdownHookRef: AnyRef = _
 
   /* ------------------------------------------------------------------------------------- *
    | Accessors and public fields. These provide access to the internal state of the        |
@@ -517,6 +518,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     _taskScheduler.postStartHook()
     _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
     _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
+
+    // Make sure the context is stopped if the user forgets about it. This avoids leaving
+    // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
+    // is killed, though.
+    _shutdownHookRef = Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
+      logInfo("Invoking stop() from shutdown hook")
+      stop()
+    }
   } catch {
     case NonFatal(e) =>
       logError("Error initializing SparkContext.", e)
@@ -1481,6 +1490,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       logInfo("SparkContext already stopped.")
       return
     }
+    if (_shutdownHookRef != null) {
+      Utils.removeShutdownHook(_shutdownHookRef)
+    }
 
     postApplicationEnd()
     _ui.foreach(_.stop())
@@ -1891,7 +1903,7 @@ object SparkContext extends Logging {
    *
    * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
    */
-  private val activeContext: AtomicReference[SparkContext] = 
+  private val activeContext: AtomicReference[SparkContext] =
     new AtomicReference[SparkContext](null)
 
   /**
@@ -1944,11 +1956,11 @@ object SparkContext extends Logging {
   }
 
   /**
-   * This function may be used to get or instantiate a SparkContext and register it as a 
-   * singleton object. Because we can only have one active SparkContext per JVM, 
-   * this is useful when applications may wish to share a SparkContext. 
+   * This function may be used to get or instantiate a SparkContext and register it as a
+   * singleton object. Because we can only have one active SparkContext per JVM,
+   * this is useful when applications may wish to share a SparkContext.
    *
-   * Note: This function cannot be used to create multiple SparkContext instances 
+   * Note: This function cannot be used to create multiple SparkContext instances
    * even if multiple contexts are allowed.
    */
   def getOrCreate(config: SparkConf): SparkContext = {
@@ -1961,17 +1973,17 @@ object SparkContext extends Logging {
       activeContext.get()
     }
   }
-  
+
   /**
-   * This function may be used to get or instantiate a SparkContext and register it as a 
-   * singleton object. Because we can only have one active SparkContext per JVM, 
+   * This function may be used to get or instantiate a SparkContext and register it as a
+   * singleton object. Because we can only have one active SparkContext per JVM,
    * this is useful when applications may wish to share a SparkContext.
-   * 
+   *
    * This method allows not passing a SparkConf (useful if just retrieving).
-   * 
-   * Note: This function cannot be used to create multiple SparkContext instances 
-   * even if multiple contexts are allowed. 
-   */ 
+   *
+   * Note: This function cannot be used to create multiple SparkContext instances
+   * even if multiple contexts are allowed.
+   */
   def getOrCreate(): SparkContext = {
     getOrCreate(new SparkConf())
   }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c6c6df7cfa56e8d94029f880ec9d62b407249e04..342bc9a06db47e54479ab6a5d10b059bbd222710 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -67,6 +67,12 @@ private[spark] object Utils extends Logging {
 
   val DEFAULT_SHUTDOWN_PRIORITY = 100
 
+  /**
+   * The shutdown priority of the SparkContext instance. This is lower than the default
+   * priority, so that by default hooks are run before the context is shut down.
+   */
+  val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
+
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   @volatile private var localRootDirs: Array[String] = null
 
@@ -2116,7 +2122,7 @@ private[spark] object Utils extends Logging {
    * @return A handle that can be used to unregister the shutdown hook.
    */
   def addShutdownHook(hook: () => Unit): AnyRef = {
-    addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
+    addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
   }
 
   /**
@@ -2126,7 +2132,7 @@ private[spark] object Utils extends Logging {
    * @param hook The code to run during shutdown.
    * @return A handle that can be used to unregister the shutdown hook.
    */
-  def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
+  def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
     shutdownHooks.add(priority, hook)
   }
 
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 93ae45133ce241cd7d3abb4ede931d95d5ba5690..70cb57ffd8c69f5b525b73af42f4203a4c7f55fe 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -95,14 +95,8 @@ private[spark] class ApplicationMaster(
 
       val fs = FileSystem.get(yarnConf)
 
-      Utils.addShutdownHook { () =>
-        // If the SparkContext is still registered, shut it down as a best case effort in case
-        // users do not call sc.stop or do System.exit().
-        val sc = sparkContextRef.get()
-        if (sc != null) {
-          logInfo("Invoking sc stop from shutdown hook")
-          sc.stop()
-        }
+      // This shutdown hook should run *after* the SparkContext is shut down.
+      Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1) { () =>
         val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
         val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts