diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 21c6e6ffa6666809e717733c94c71e18612c792f..9385f557c46144d074ea5350701f8348fed27470 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark
 
+import java.util.concurrent.{Executors, TimeUnit}
+
 import scala.collection.mutable
 
 import org.apache.spark.scheduler._
-import org.apache.spark.util.{SystemClock, Clock}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /**
  * An agent that dynamically allocates and removes executors based on the workload.
@@ -129,6 +131,10 @@ private[spark] class ExecutorAllocationManager(
   // Listener for Spark events that impact the allocation policy
   private val listener = new ExecutorAllocationListener
 
+  // Executor that handles the scheduling task.
+  private val executor = Executors.newSingleThreadScheduledExecutor(
+    Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
+
   /**
    * Verify that the settings specified through the config are valid.
    * If not, throw an appropriate exception.
@@ -173,32 +179,24 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Register for scheduler callbacks to decide when to add and remove executors.
+   * Register for scheduler callbacks to decide when to add and remove executors, and start
+   * the scheduling task.
    */
   def start(): Unit = {
     listenerBus.addListener(listener)
-    startPolling()
+
+    val scheduleTask = new Runnable() {
+      override def run(): Unit = Utils.logUncaughtExceptions(schedule())
+    }
+    executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
   }
 
   /**
-   * Start the main polling thread that keeps track of when to add and remove executors.
+   * Stop the allocation manager.
    */
-  private def startPolling(): Unit = {
-    val t = new Thread {
-      override def run(): Unit = {
-        while (true) {
-          try {
-            schedule()
-          } catch {
-            case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
-          }
-          Thread.sleep(intervalMillis)
-        }
-      }
-    }
-    t.setName("spark-dynamic-executor-allocation")
-    t.setDaemon(true)
-    t.start()
+  def stop(): Unit = {
+    executor.shutdown()
+    executor.awaitTermination(10, TimeUnit.SECONDS)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3904f7d1060c5c65845de48bc446158ffff62f49..5b3778ead6994bed9fe722b2a5b5257487df57ff 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1136,7 +1136,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * Return whether dynamically adjusting the amount of resources allocated to
    * this application is supported. This is currently only available for YARN.
    */
-  private[spark] def supportDynamicAllocation = 
+  private[spark] def supportDynamicAllocation =
     master.contains("yarn") || dynamicAllocationTesting
 
   /**
@@ -1400,6 +1400,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
         env.metricsSystem.report()
         metadataCleaner.cancel()
         cleaner.foreach(_.stop())
+        executorAllocationManager.foreach(_.stop())
         dagScheduler.stop()
         dagScheduler = null
         listenerBus.stop()
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index abfcee75728dcf4c6eb1cf5c9d23001ac8fb779d..3ded1e4af87422e73e3ed81eb3d39add4b7f6bf8 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import scala.collection.mutable
 
-import org.scalatest.{FunSuite, PrivateMethodTester}
+import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -28,10 +28,20 @@ import org.apache.spark.util.ManualClock
 /**
  * Test add and remove behavior of ExecutorAllocationManager.
  */
-class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
+class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
   import ExecutorAllocationManager._
   import ExecutorAllocationManagerSuite._
 
+  private val contexts = new mutable.ListBuffer[SparkContext]()
+
+  before {
+    contexts.clear()
+  }
+
+  after {
+    contexts.foreach(_.stop())
+  }
+
   test("verify min/max executors") {
     val conf = new SparkConf()
       .setMaster("local")
@@ -39,18 +49,19 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
       .set("spark.dynamicAllocation.enabled", "true")
       .set("spark.dynamicAllocation.testing", "true")
     val sc0 = new SparkContext(conf)
+    contexts += sc0
     assert(sc0.executorAllocationManager.isDefined)
     sc0.stop()
 
     // Min < 0
     val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
-    intercept[SparkException] { new SparkContext(conf1) }
+    intercept[SparkException] { contexts += new SparkContext(conf1) }
     SparkEnv.get.stop()
     SparkContext.clearActiveContext()
 
     // Max < 0
     val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
-    intercept[SparkException] { new SparkContext(conf2) }
+    intercept[SparkException] { contexts += new SparkContext(conf2) }
     SparkEnv.get.stop()
     SparkContext.clearActiveContext()
 
@@ -665,16 +676,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
     assert(removeTimes(manager).contains("executor-2"))
     assert(!removeTimes(manager).contains("executor-1"))
   }
-}
-
-/**
- * Helper methods for testing ExecutorAllocationManager.
- * This includes methods to access private methods and fields in ExecutorAllocationManager.
- */
-private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
-  private val schedulerBacklogTimeout = 1L
-  private val sustainedSchedulerBacklogTimeout = 2L
-  private val executorIdleTimeout = 3L
 
   private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
     val conf = new SparkConf()
@@ -688,9 +689,22 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
         sustainedSchedulerBacklogTimeout.toString)
       .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
       .set("spark.dynamicAllocation.testing", "true")
-    new SparkContext(conf)
+    val sc = new SparkContext(conf)
+    contexts += sc
+    sc
   }
 
+}
+
+/**
+ * Helper methods for testing ExecutorAllocationManager.
+ * This includes methods to access private methods and fields in ExecutorAllocationManager.
+ */
+private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
+  private val schedulerBacklogTimeout = 1L
+  private val sustainedSchedulerBacklogTimeout = 2L
+  private val executorIdleTimeout = 3L
+
   private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
     new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
   }