diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index ef5b687b5831a87f5cbedb0ebfc2283e2c9f5a4f..10b5a7f57a802537b717742deda4a9655e345132 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -22,6 +22,36 @@ import scala.collection.mutable
 
 import org.apache.spark.streaming.receiver.Receiver
 
+/**
+ * A class that tries to schedule receivers with evenly distributed. There are two phases for
+ * scheduling receivers.
+ *
+ * - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
+ *   all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
+ *   It will try to schedule receivers with evenly distributed. ReceiverTracker should update its
+ *   receiverTrackingInfoMap according to the results of `scheduleReceivers`.
+ *   `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that
+ *   contains the scheduled locations. Then when a receiver is starting, it will send a register
+ *   request and `ReceiverTracker.registerReceiver` will be called. In
+ *   `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check
+ *   if the location of this receiver is one of the scheduled executors, if not, the register will
+ *   be rejected.
+ * - The second phase is local scheduling when a receiver is restarting. There are two cases of
+ *   receiver restarting:
+ *   - If a receiver is restarting because it's rejected due to the real location and the scheduled
+ *     executors mismatching, in other words, it fails to start in one of the locations that
+ *     `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are
+ *     still alive in the list of scheduled executors, then use them to launch the receiver job.
+ *   - If a receiver is restarting without a scheduled executors list, or the executors in the list
+ *     are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should
+ *     not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear
+ *     it. Then when this receiver is registering, we can know this is a local scheduling, and
+ *     `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching
+ *     location is matching.
+ *
+ * In conclusion, we should make a global schedule, try to achieve that exactly as long as possible,
+ * otherwise do local scheduling.
+ */
 private[streaming] class ReceiverSchedulingPolicy {
 
   /**
@@ -102,8 +132,7 @@ private[streaming] class ReceiverSchedulingPolicy {
 
   /**
    * Return a list of candidate executors to run the receiver. If the list is empty, the caller can
-   * run this receiver in arbitrary executor. The caller can use `preferredNumExecutors` to require
-   * returning `preferredNumExecutors` executors if possible.
+   * run this receiver in arbitrary executor.
    *
    * This method tries to balance executors' load. Here is the approach to schedule executors
    * for a receiver.
@@ -122,9 +151,8 @@ private[streaming] class ReceiverSchedulingPolicy {
    *         If a receiver is scheduled to an executor but has not yet run, it contributes
    *         `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li>
    *     </ul>
-   *     At last, if there are more than `preferredNumExecutors` idle executors (weight = 0),
-   *     returns all idle executors. Otherwise, we only return `preferredNumExecutors` best options
-   *     according to the weights.
+   *     At last, if there are any idle executors (weight = 0), returns all idle executors.
+   *     Otherwise, returns the executors that have the minimum weight.
    *   </li>
    * </ol>
    *
@@ -134,8 +162,7 @@ private[streaming] class ReceiverSchedulingPolicy {
       receiverId: Int,
       preferredLocation: Option[String],
       receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
-      executors: Seq[String],
-      preferredNumExecutors: Int = 3): Seq[String] = {
+      executors: Seq[String]): Seq[String] = {
     if (executors.isEmpty) {
       return Seq.empty
     }
@@ -156,15 +183,18 @@ private[streaming] class ReceiverSchedulingPolicy {
       }
     }.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
 
-    val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq
-    if (idleExecutors.size >= preferredNumExecutors) {
-      // If there are more than `preferredNumExecutors` idle executors, return all of them
+    val idleExecutors = executors.toSet -- executorWeights.keys
+    if (idleExecutors.nonEmpty) {
       scheduledExecutors ++= idleExecutors
     } else {
-      // If there are less than `preferredNumExecutors` idle executors, return 3 best options
-      scheduledExecutors ++= idleExecutors
-      val sortedExecutors = executorWeights.toSeq.sortBy(_._2).map(_._1)
-      scheduledExecutors ++= (idleExecutors ++ sortedExecutors).take(preferredNumExecutors)
+      // There is no idle executor. So select all executors that have the minimum weight.
+      val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
+      if (sortedExecutors.nonEmpty) {
+        val minWeight = sortedExecutors(0)._2
+        scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
+      } else {
+        // This should not happen since "executors" is not empty
+      }
     }
     scheduledExecutors.toSeq
   }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 30d25a64e307a4bd5816386381adbcbfc133eb1f..3d532a675db02644907e2421c96acdba4aceac40 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -244,8 +244,21 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     }
 
     if (isTrackerStopping || isTrackerStopped) {
-      false
-    } else if (!scheduleReceiver(streamId).contains(hostPort)) {
+      return false
+    }
+
+    val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors
+    val accetableExecutors = if (scheduledExecutors.nonEmpty) {
+        // This receiver is registering and it's scheduled by
+        // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it.
+        scheduledExecutors.get
+      } else {
+        // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
+        // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
+        scheduleReceiver(streamId)
+      }
+
+    if (!accetableExecutors.contains(hostPort)) {
       // Refuse it since it's scheduled to a wrong executor
       false
     } else {
@@ -426,12 +439,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
           startReceiver(receiver, executors)
         }
       case RestartReceiver(receiver) =>
-        val scheduledExecutors = schedulingPolicy.rescheduleReceiver(
-          receiver.streamId,
-          receiver.preferredLocation,
-          receiverTrackingInfos,
-          getExecutors)
-        updateReceiverScheduledExecutors(receiver.streamId, scheduledExecutors)
+        // Old scheduled executors minus the ones that are not active any more
+        val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
+        val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) {
+            // Try global scheduling again
+            oldScheduledExecutors
+          } else {
+            val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
+            // Clear "scheduledExecutors" to indicate we are going to do local scheduling
+            val newReceiverInfo = oldReceiverInfo.copy(
+              state = ReceiverState.INACTIVE, scheduledExecutors = None)
+            receiverTrackingInfos(receiver.streamId) = newReceiverInfo
+            schedulingPolicy.rescheduleReceiver(
+              receiver.streamId,
+              receiver.preferredLocation,
+              receiverTrackingInfos,
+              getExecutors)
+          }
+        // Assume there is one receiver restarting at one time, so we don't need to update
+        // receiverTrackingInfos
         startReceiver(receiver, scheduledExecutors)
       case c: CleanupOldBlocks =>
         receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
@@ -464,6 +490,24 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
         context.reply(true)
     }
 
+    /**
+     * Return the stored scheduled executors that are still alive.
+     */
+    private def getStoredScheduledExecutors(receiverId: Int): Seq[String] = {
+      if (receiverTrackingInfos.contains(receiverId)) {
+        val scheduledExecutors = receiverTrackingInfos(receiverId).scheduledExecutors
+        if (scheduledExecutors.nonEmpty) {
+          val executors = getExecutors.toSet
+          // Only return the alive executors
+          scheduledExecutors.get.filter(executors)
+        } else {
+          Nil
+        }
+      } else {
+        Nil
+      }
+    }
+
     /**
      * Start a receiver along with its scheduled executors
      */
@@ -484,7 +528,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
         new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
 
       // Function to start the receiver on the worker node
-      val startReceiverFunc = new StartReceiverFunc(checkpointDirOption, serializableHadoopConf)
+      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
+        (iterator: Iterator[Receiver[_]]) => {
+          if (!iterator.hasNext) {
+            throw new SparkException(
+              "Could not start receiver as object not found.")
+          }
+          if (TaskContext.get().attemptNumber() == 0) {
+            val receiver = iterator.next()
+            assert(iterator.hasNext == false)
+            val supervisor = new ReceiverSupervisorImpl(
+              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
+            supervisor.start()
+            supervisor.awaitTermination()
+          } else {
+            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
+          }
+        }
 
       // Create the RDD using the scheduledExecutors to run the receiver in a Spark job
       val receiverRDD: RDD[Receiver[_]] =
@@ -541,31 +601,3 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
   }
 
 }
-
-/**
- * Function to start the receiver on the worker node. Use a class instead of closure to avoid
- * the serialization issue.
- */
-private[streaming] class StartReceiverFunc(
-    checkpointDirOption: Option[String],
-    serializableHadoopConf: SerializableConfiguration)
-  extends (Iterator[Receiver[_]] => Unit) with Serializable {
-
-  override def apply(iterator: Iterator[Receiver[_]]): Unit = {
-    if (!iterator.hasNext) {
-      throw new SparkException(
-        "Could not start receiver as object not found.")
-    }
-    if (TaskContext.get().attemptNumber() == 0) {
-      val receiver = iterator.next()
-      assert(iterator.hasNext == false)
-      val supervisor = new ReceiverSupervisorImpl(
-        receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
-      supervisor.start()
-      supervisor.awaitTermination()
-    } else {
-      // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
-    }
-  }
-
-}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
index 0418d776ecc9a8c4ccc71d490934865da9e059dd..b2a51d72bac2b1339f94a7ac96e6f238636567d7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
@@ -39,7 +39,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
     assert(scheduledExecutors.toSet === Set("host1", "host2"))
   }
 
-  test("rescheduleReceiver: return all idle executors if more than 3 idle executors") {
+  test("rescheduleReceiver: return all idle executors if there are any idle executors") {
     val executors = Seq("host1", "host2", "host3", "host4", "host5")
     // host3 is idle
     val receiverTrackingInfoMap = Map(
@@ -49,16 +49,16 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
     assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5"))
   }
 
-  test("rescheduleReceiver: return 3 best options if less than 3 idle executors") {
+  test("rescheduleReceiver: return all executors that have minimum weight if no idle executors") {
     val executors = Seq("host1", "host2", "host3", "host4", "host5")
-    // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0
-    // host4 and host5 are idle
+    // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5
     val receiverTrackingInfoMap = Map(
       0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
       1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
-      2 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None))
+      2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None),
+      3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None))
     val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
-      3, None, receiverTrackingInfoMap, executors)
+      4, None, receiverTrackingInfoMap, executors)
     assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
   }
 
@@ -127,4 +127,5 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
       assert(executors.isEmpty)
     }
   }
+
 }