diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
index 1b65926f5c749a33b8d5b63f921de02368e7f662..1eb6c1614fc0bfc157219b272c892b4c2d8bf171 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
@@ -31,7 +31,9 @@ private[spark] sealed trait TaskLocation {
  */
 private [spark]
 case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
-  extends TaskLocation
+  extends TaskLocation {
+  override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
+}
 
 /**
  * A location on a host.
@@ -53,6 +55,9 @@ private[spark] object TaskLocation {
   // confusion.  See  RFC 952 and RFC 1123 for information about the format of hostnames.
   val inMemoryLocationTag = "hdfs_cache_"
 
+  // Identify locations of executors with this prefix.
+  val executorLocationTag = "executor_"
+
   def apply(host: String, executorId: String): TaskLocation = {
     new ExecutorCacheTaskLocation(host, executorId)
   }
@@ -65,7 +70,15 @@ private[spark] object TaskLocation {
   def apply(str: String): TaskLocation = {
     val hstr = str.stripPrefix(inMemoryLocationTag)
     if (hstr.equals(str)) {
-      new HostTaskLocation(str)
+      if (str.startsWith(executorLocationTag)) {
+        val splits = str.split("_")
+        if (splits.length != 3) {
+          throw new IllegalArgumentException("Illegal executor location format: " + str)
+        }
+        new ExecutorCacheTaskLocation(splits(1), splits(2))
+      } else {
+        new HostTaskLocation(str)
+      }
     } else {
       new HDFSCacheTaskLocation(hstr)
     }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 695523cc8aa3a4d6a6396a54c29e269059b9d5e1..cd6bf723e70cbd8429c0138d9e92a59907c281ad 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -779,6 +779,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
   test("Test TaskLocation for different host type.") {
     assert(TaskLocation("host1") === HostTaskLocation("host1"))
     assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1"))
+    assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3"))
   }
 
   def createTaskResult(id: Int): DirectTaskResult[Int] = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 59ef58d232ee7cd0678f8ab088c76c6bad9f6d25..167f56aa422819b8d012f4661039519dbd65e161 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -47,7 +47,8 @@ private[streaming] class ReceiverSupervisorImpl(
     checkpointDirOption: Option[String]
   ) extends ReceiverSupervisor(receiver, env.conf) with Logging {
 
-  private val hostPort = SparkEnv.get.blockManager.blockManagerId.hostPort
+  private val host = SparkEnv.get.blockManager.blockManagerId.host
+  private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
 
   private val receivedBlockHandler: ReceivedBlockHandler = {
     if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
@@ -179,7 +180,7 @@ private[streaming] class ReceiverSupervisorImpl(
 
   override protected def onReceiverStart(): Boolean = {
     val msg = RegisterReceiver(
-      streamId, receiver.getClass.getSimpleName, hostPort, endpoint)
+      streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
     trackerEndpoint.askWithRetry[Boolean](msg)
   }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index d2b0be7f4a9c5cfee2b06b5ef56a36d442b74582..234bc8660da8ac3002a0c45b5708f34ee65eddd3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -20,8 +20,8 @@ package org.apache.spark.streaming.scheduler
 import scala.collection.Map
 import scala.collection.mutable
 
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation}
 import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.Utils
 
 /**
  * A class that tries to schedule receivers with evenly distributed. There are two phases for
@@ -29,23 +29,23 @@ import org.apache.spark.util.Utils
  *
  * - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
  *   all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
- *   It will try to schedule receivers with evenly distributed. ReceiverTracker should update its
- *   receiverTrackingInfoMap according to the results of `scheduleReceivers`.
- *   `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that
- *   contains the scheduled locations. Then when a receiver is starting, it will send a register
- *   request and `ReceiverTracker.registerReceiver` will be called. In
- *   `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check
- *   if the location of this receiver is one of the scheduled executors, if not, the register will
+ *   It will try to schedule receivers such that they are evenly distributed. ReceiverTracker should
+ *   update its `receiverTrackingInfoMap` according to the results of `scheduleReceivers`.
+ *   `ReceiverTrackingInfo.scheduledLocations` for each receiver should be set to an location list
+ *   that contains the scheduled locations. Then when a receiver is starting, it will send a
+ *   register request and `ReceiverTracker.registerReceiver` will be called. In
+ *   `ReceiverTracker.registerReceiver`, if a receiver's scheduled locations is set, it should check
+ *   if the location of this receiver is one of the scheduled locations, if not, the register will
  *   be rejected.
  * - The second phase is local scheduling when a receiver is restarting. There are two cases of
  *   receiver restarting:
  *   - If a receiver is restarting because it's rejected due to the real location and the scheduled
- *     executors mismatching, in other words, it fails to start in one of the locations that
+ *     locations mismatching, in other words, it fails to start in one of the locations that
  *     `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are
- *     still alive in the list of scheduled executors, then use them to launch the receiver job.
- *   - If a receiver is restarting without a scheduled executors list, or the executors in the list
+ *     still alive in the list of scheduled locations, then use them to launch the receiver job.
+ *   - If a receiver is restarting without a scheduled locations list, or the executors in the list
  *     are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should
- *     not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear
+ *     not set `ReceiverTrackingInfo.scheduledLocations` for this receiver, instead, it should clear
  *     it. Then when this receiver is registering, we can know this is a local scheduling, and
  *     `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching
  *     location is matching.
@@ -69,9 +69,12 @@ private[streaming] class ReceiverSchedulingPolicy {
    * </ol>
    *
    * This method is called when we start to launch receivers at the first time.
+   *
+   * @return a map for receivers and their scheduled locations
    */
   def scheduleReceivers(
-      receivers: Seq[Receiver[_]], executors: Seq[String]): Map[Int, Seq[String]] = {
+      receivers: Seq[Receiver[_]],
+      executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {
     if (receivers.isEmpty) {
       return Map.empty
     }
@@ -80,16 +83,16 @@ private[streaming] class ReceiverSchedulingPolicy {
       return receivers.map(_.streamId -> Seq.empty).toMap
     }
 
-    val hostToExecutors = executors.groupBy(executor => Utils.parseHostPort(executor)._1)
-    val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
-    val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+    val hostToExecutors = executors.groupBy(_.host)
+    val scheduledLocations = Array.fill(receivers.length)(new mutable.ArrayBuffer[TaskLocation])
+    val numReceiversOnExecutor = mutable.HashMap[ExecutorCacheTaskLocation, Int]()
     // Set the initial value to 0
     executors.foreach(e => numReceiversOnExecutor(e) = 0)
 
     // Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
     // we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
     for (i <- 0 until receivers.length) {
-      // Note: preferredLocation is host but executors are host:port
+      // Note: preferredLocation is host but executors are host_executorId
       receivers(i).preferredLocation.foreach { host =>
         hostToExecutors.get(host) match {
           case Some(executorsOnHost) =>
@@ -97,7 +100,7 @@ private[streaming] class ReceiverSchedulingPolicy {
             // this host
             val leastScheduledExecutor =
               executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
-            scheduledExecutors(i) += leastScheduledExecutor
+            scheduledLocations(i) += leastScheduledExecutor
             numReceiversOnExecutor(leastScheduledExecutor) =
               numReceiversOnExecutor(leastScheduledExecutor) + 1
           case None =>
@@ -106,17 +109,20 @@ private[streaming] class ReceiverSchedulingPolicy {
             // 1. This executor is not up. But it may be up later.
             // 2. This executor is dead, or it's not a host in the cluster.
             // Currently, simply add host to the scheduled executors.
-            scheduledExecutors(i) += host
+
+            // Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle
+            // this case
+            scheduledLocations(i) += TaskLocation(host)
         }
       }
     }
 
     // For those receivers that don't have preferredLocation, make sure we assign at least one
     // executor to them.
-    for (scheduledExecutorsForOneReceiver <- scheduledExecutors.filter(_.isEmpty)) {
+    for (scheduledLocationsForOneReceiver <- scheduledLocations.filter(_.isEmpty)) {
       // Select the executor that has the least receivers
       val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
-      scheduledExecutorsForOneReceiver += leastScheduledExecutor
+      scheduledLocationsForOneReceiver += leastScheduledExecutor
       numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
     }
 
@@ -124,22 +130,22 @@ private[streaming] class ReceiverSchedulingPolicy {
     val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
     for (executor <- idleExecutors) {
       // Assign an idle executor to the receiver that has least candidate executors.
-      val leastScheduledExecutors = scheduledExecutors.minBy(_.size)
+      val leastScheduledExecutors = scheduledLocations.minBy(_.size)
       leastScheduledExecutors += executor
     }
 
-    receivers.map(_.streamId).zip(scheduledExecutors).toMap
+    receivers.map(_.streamId).zip(scheduledLocations).toMap
   }
 
   /**
-   * Return a list of candidate executors to run the receiver. If the list is empty, the caller can
+   * Return a list of candidate locations to run the receiver. If the list is empty, the caller can
    * run this receiver in arbitrary executor.
    *
    * This method tries to balance executors' load. Here is the approach to schedule executors
    * for a receiver.
    * <ol>
    *   <li>
-   *     If preferredLocation is set, preferredLocation should be one of the candidate executors.
+   *     If preferredLocation is set, preferredLocation should be one of the candidate locations.
    *   </li>
    *   <li>
    *     Every executor will be assigned to a weight according to the receivers running or
@@ -163,40 +169,58 @@ private[streaming] class ReceiverSchedulingPolicy {
       receiverId: Int,
       preferredLocation: Option[String],
       receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
-      executors: Seq[String]): Seq[String] = {
+      executors: Seq[ExecutorCacheTaskLocation]): Seq[TaskLocation] = {
     if (executors.isEmpty) {
       return Seq.empty
     }
 
     // Always try to schedule to the preferred locations
-    val scheduledExecutors = mutable.Set[String]()
-    scheduledExecutors ++= preferredLocation
-
-    val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo =>
-      receiverTrackingInfo.state match {
-        case ReceiverState.INACTIVE => Nil
-        case ReceiverState.SCHEDULED =>
-          val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get
-          // The probability that a scheduled receiver will run in an executor is
-          // 1.0 / scheduledLocations.size
-          scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size))
-        case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
-      }
-    }.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
+    val scheduledLocations = mutable.Set[TaskLocation]()
+    // Note: preferredLocation could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to
+    // handle this case
+    scheduledLocations ++= preferredLocation.map(TaskLocation(_))
+
+    val executorWeights: Map[ExecutorCacheTaskLocation, Double] = {
+      receiverTrackingInfoMap.values.flatMap(convertReceiverTrackingInfoToExecutorWeights)
+        .groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
+    }
 
     val idleExecutors = executors.toSet -- executorWeights.keys
     if (idleExecutors.nonEmpty) {
-      scheduledExecutors ++= idleExecutors
+      scheduledLocations ++= idleExecutors
     } else {
       // There is no idle executor. So select all executors that have the minimum weight.
       val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
       if (sortedExecutors.nonEmpty) {
         val minWeight = sortedExecutors(0)._2
-        scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
+        scheduledLocations ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
       } else {
         // This should not happen since "executors" is not empty
       }
     }
-    scheduledExecutors.toSeq
+    scheduledLocations.toSeq
+  }
+
+  /**
+   * This method tries to convert a receiver tracking info to executor weights. Every executor will
+   * be assigned to a weight according to the receivers running or scheduling on it:
+   *
+   * - If a receiver is running on an executor, it contributes 1.0 to the executor's weight.
+   * - If a receiver is scheduled to an executor but has not yet run, it contributes
+   * `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.
+   */
+  private def convertReceiverTrackingInfoToExecutorWeights(
+      receiverTrackingInfo: ReceiverTrackingInfo): Seq[(ExecutorCacheTaskLocation, Double)] = {
+    receiverTrackingInfo.state match {
+      case ReceiverState.INACTIVE => Nil
+      case ReceiverState.SCHEDULED =>
+        val scheduledLocations = receiverTrackingInfo.scheduledLocations.get
+        // The probability that a scheduled receiver will run in an executor is
+        // 1.0 / scheduledLocations.size
+        scheduledLocations.filter(_.isInstanceOf[ExecutorCacheTaskLocation]).map { location =>
+          location.asInstanceOf[ExecutorCacheTaskLocation] -> (1.0 / scheduledLocations.size)
+        }
+      case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
+    }
   }
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 2ce80d618b0a3a9e6d2a882181a0d64c6f586dbd..b183d856f50c39827b2e047e2e59eea9784ef4ac 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -17,20 +17,21 @@
 
 package org.apache.spark.streaming.scheduler
 
-import java.util.concurrent.{TimeUnit, CountDownLatch}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import scala.collection.mutable.HashMap
 import scala.concurrent.ExecutionContext
 import scala.language.existentials
 import scala.util.{Failure, Success}
 
-import org.apache.spark.streaming.util.WriteAheadLogUtils
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rpc._
+import org.apache.spark.scheduler.{TaskLocation, ExecutorCacheTaskLocation}
 import org.apache.spark.streaming.{StreamingContext, Time}
 import org.apache.spark.streaming.receiver._
-import org.apache.spark.util.{Utils, ThreadUtils, SerializableConfiguration}
+import org.apache.spark.streaming.util.WriteAheadLogUtils
+import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils}
 
 
 /** Enumeration to identify current state of a Receiver */
@@ -47,7 +48,8 @@ private[streaming] sealed trait ReceiverTrackerMessage
 private[streaming] case class RegisterReceiver(
     streamId: Int,
     typ: String,
-    hostPort: String,
+    host: String,
+    executorId: String,
     receiverEndpoint: RpcEndpointRef
   ) extends ReceiverTrackerMessage
 private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
@@ -235,7 +237,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
   private def registerReceiver(
       streamId: Int,
       typ: String,
-      hostPort: String,
+      host: String,
+      executorId: String,
       receiverEndpoint: RpcEndpointRef,
       senderAddress: RpcAddress
     ): Boolean = {
@@ -247,18 +250,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
       return false
     }
 
-    val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors
-    val accetableExecutors = if (scheduledExecutors.nonEmpty) {
+    val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
+    val acceptableExecutors = if (scheduledLocations.nonEmpty) {
         // This receiver is registering and it's scheduled by
-        // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it.
-        scheduledExecutors.get
+        // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
+        scheduledLocations.get
       } else {
         // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
         // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
         scheduleReceiver(streamId)
       }
 
-    if (!accetableExecutors.contains(hostPort)) {
+    def isAcceptable: Boolean = acceptableExecutors.exists {
+      case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
+      case loc: TaskLocation => loc.host == host
+    }
+
+    if (!isAcceptable) {
       // Refuse it since it's scheduled to a wrong executor
       false
     } else {
@@ -266,8 +274,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
       val receiverTrackingInfo = ReceiverTrackingInfo(
         streamId,
         ReceiverState.ACTIVE,
-        scheduledExecutors = None,
-        runningExecutor = Some(hostPort),
+        scheduledLocations = None,
+        runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
         name = Some(name),
         endpoint = Some(receiverEndpoint))
       receiverTrackingInfos.put(streamId, receiverTrackingInfo)
@@ -338,25 +346,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     logWarning(s"Error reported by receiver for stream $streamId: $messageWithError")
   }
 
-  private def scheduleReceiver(receiverId: Int): Seq[String] = {
+  private def scheduleReceiver(receiverId: Int): Seq[TaskLocation] = {
     val preferredLocation = receiverPreferredLocations.getOrElse(receiverId, None)
-    val scheduledExecutors = schedulingPolicy.rescheduleReceiver(
+    val scheduledLocations = schedulingPolicy.rescheduleReceiver(
       receiverId, preferredLocation, receiverTrackingInfos, getExecutors)
-    updateReceiverScheduledExecutors(receiverId, scheduledExecutors)
-    scheduledExecutors
+    updateReceiverScheduledExecutors(receiverId, scheduledLocations)
+    scheduledLocations
   }
 
   private def updateReceiverScheduledExecutors(
-      receiverId: Int, scheduledExecutors: Seq[String]): Unit = {
+      receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = {
     val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match {
       case Some(oldInfo) =>
         oldInfo.copy(state = ReceiverState.SCHEDULED,
-          scheduledExecutors = Some(scheduledExecutors))
+          scheduledLocations = Some(scheduledLocations))
       case None =>
         ReceiverTrackingInfo(
           receiverId,
           ReceiverState.SCHEDULED,
-          Some(scheduledExecutors),
+          Some(scheduledLocations),
           runningExecutor = None)
     }
     receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo)
@@ -370,13 +378,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
   /**
    * Get the list of executors excluding driver
    */
-  private def getExecutors: Seq[String] = {
+  private def getExecutors: Seq[ExecutorCacheTaskLocation] = {
     if (ssc.sc.isLocal) {
-      Seq(ssc.sparkContext.env.blockManager.blockManagerId.hostPort)
+      val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId
+      Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId))
     } else {
       ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) =>
         blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location
-      }.map { case (blockManagerId, _) => blockManagerId.hostPort }.toSeq
+      }.map { case (blockManagerId, _) =>
+        ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)
+      }.toSeq
     }
   }
 
@@ -431,9 +442,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     override def receive: PartialFunction[Any, Unit] = {
       // Local messages
       case StartAllReceivers(receivers) =>
-        val scheduledExecutors = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
+        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
         for (receiver <- receivers) {
-          val executors = scheduledExecutors(receiver.streamId)
+          val executors = scheduledLocations(receiver.streamId)
           updateReceiverScheduledExecutors(receiver.streamId, executors)
           receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
           startReceiver(receiver, executors)
@@ -441,14 +452,14 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
       case RestartReceiver(receiver) =>
         // Old scheduled executors minus the ones that are not active any more
         val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
-        val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) {
+        val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
             // Try global scheduling again
             oldScheduledExecutors
           } else {
             val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
-            // Clear "scheduledExecutors" to indicate we are going to do local scheduling
+            // Clear "scheduledLocations" to indicate we are going to do local scheduling
             val newReceiverInfo = oldReceiverInfo.copy(
-              state = ReceiverState.INACTIVE, scheduledExecutors = None)
+              state = ReceiverState.INACTIVE, scheduledLocations = None)
             receiverTrackingInfos(receiver.streamId) = newReceiverInfo
             schedulingPolicy.rescheduleReceiver(
               receiver.streamId,
@@ -458,7 +469,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
           }
         // Assume there is one receiver restarting at one time, so we don't need to update
         // receiverTrackingInfos
-        startReceiver(receiver, scheduledExecutors)
+        startReceiver(receiver, scheduledLocations)
       case c: CleanupOldBlocks =>
         receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
       case UpdateReceiverRateLimit(streamUID, newRate) =>
@@ -472,9 +483,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
       // Remote messages
-      case RegisterReceiver(streamId, typ, hostPort, receiverEndpoint) =>
+      case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
         val successful =
-          registerReceiver(streamId, typ, hostPort, receiverEndpoint, context.senderAddress)
+          registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
         context.reply(successful)
       case AddBlock(receivedBlockInfo) =>
         context.reply(addBlock(receivedBlockInfo))
@@ -493,13 +504,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     /**
      * Return the stored scheduled executors that are still alive.
      */
-    private def getStoredScheduledExecutors(receiverId: Int): Seq[String] = {
+    private def getStoredScheduledExecutors(receiverId: Int): Seq[TaskLocation] = {
       if (receiverTrackingInfos.contains(receiverId)) {
-        val scheduledExecutors = receiverTrackingInfos(receiverId).scheduledExecutors
-        if (scheduledExecutors.nonEmpty) {
+        val scheduledLocations = receiverTrackingInfos(receiverId).scheduledLocations
+        if (scheduledLocations.nonEmpty) {
           val executors = getExecutors.toSet
           // Only return the alive executors
-          scheduledExecutors.get.filter(executors)
+          scheduledLocations.get.filter {
+            case loc: ExecutorCacheTaskLocation => executors(loc)
+            case loc: TaskLocation => true
+          }
         } else {
           Nil
         }
@@ -511,7 +525,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
     /**
      * Start a receiver along with its scheduled executors
      */
-    private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = {
+    private def startReceiver(
+        receiver: Receiver[_],
+        scheduledLocations: Seq[TaskLocation]): Unit = {
       def shouldStartReceiver: Boolean = {
         // It's okay to start when trackerState is Initialized or Started
         !(isTrackerStopping || isTrackerStopped)
@@ -546,13 +562,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
           }
         }
 
-      // Create the RDD using the scheduledExecutors to run the receiver in a Spark job
+      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
       val receiverRDD: RDD[Receiver[_]] =
-        if (scheduledExecutors.isEmpty) {
+        if (scheduledLocations.isEmpty) {
           ssc.sc.makeRDD(Seq(receiver), 1)
         } else {
-          val preferredLocations =
-            scheduledExecutors.map(hostPort => Utils.parseHostPort(hostPort)._1).distinct
+          val preferredLocations = scheduledLocations.map(_.toString).distinct
           ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
         }
       receiverRDD.setName(s"Receiver $receiverId")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
index 043ff4d0ff054b62cde69b4ad0a475260ca53e94..ab0a84f05214df5245bf6e0965d69064d597196d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming.scheduler
 
 import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation}
 import org.apache.spark.streaming.scheduler.ReceiverState._
 
 private[streaming] case class ReceiverErrorInfo(
@@ -28,7 +29,7 @@ private[streaming] case class ReceiverErrorInfo(
  *
  * @param receiverId the unique receiver id
  * @param state the current Receiver state
- * @param scheduledExecutors the scheduled executors provided by ReceiverSchedulingPolicy
+ * @param scheduledLocations the scheduled locations provided by ReceiverSchedulingPolicy
  * @param runningExecutor the running executor if the receiver is active
  * @param name the receiver name
  * @param endpoint the receiver endpoint. It can be used to send messages to the receiver
@@ -37,8 +38,8 @@ private[streaming] case class ReceiverErrorInfo(
 private[streaming] case class ReceiverTrackingInfo(
     receiverId: Int,
     state: ReceiverState,
-    scheduledExecutors: Option[Seq[String]],
-    runningExecutor: Option[String],
+    scheduledLocations: Option[Seq[TaskLocation]],
+    runningExecutor: Option[ExecutorCacheTaskLocation],
     name: Option[String] = None,
     endpoint: Option[RpcEndpointRef] = None,
     errorInfo: Option[ReceiverErrorInfo] = None) {
@@ -47,7 +48,7 @@ private[streaming] case class ReceiverTrackingInfo(
     receiverId,
     name.getOrElse(""),
     state == ReceiverState.ACTIVE,
-    location = runningExecutor.getOrElse(""),
+    location = runningExecutor.map(_.host).getOrElse(""),
     lastErrorMessage = errorInfo.map(_.lastErrorMessage).getOrElse(""),
     lastError = errorInfo.map(_.lastError).getOrElse(""),
     lastErrorTime = errorInfo.map(_.lastErrorTime).getOrElse(-1L)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
index b2a51d72bac2b1339f94a7ac96e6f238636567d7..05b4e66c63ac614974b2662611c4c53656a9eb3e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
@@ -20,73 +20,96 @@ package org.apache.spark.streaming.scheduler
 import scala.collection.mutable
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, HostTaskLocation, TaskLocation}
 
 class ReceiverSchedulingPolicySuite extends SparkFunSuite {
 
   val receiverSchedulingPolicy = new ReceiverSchedulingPolicy
 
   test("rescheduleReceiver: empty executors") {
-    val scheduledExecutors =
+    val scheduledLocations =
       receiverSchedulingPolicy.rescheduleReceiver(0, None, Map.empty, executors = Seq.empty)
-    assert(scheduledExecutors === Seq.empty)
+    assert(scheduledLocations === Seq.empty)
   }
 
   test("rescheduleReceiver: receiver preferredLocation") {
+    val executors = Seq(ExecutorCacheTaskLocation("host2", "2"))
     val receiverTrackingInfoMap = Map(
       0 -> ReceiverTrackingInfo(0, ReceiverState.INACTIVE, None, None))
-    val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
-      0, Some("host1"), receiverTrackingInfoMap, executors = Seq("host2"))
-    assert(scheduledExecutors.toSet === Set("host1", "host2"))
+    val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
+      0, Some("host1"), receiverTrackingInfoMap, executors)
+    assert(scheduledLocations.toSet === Set(HostTaskLocation("host1"), executors(0)))
   }
 
   test("rescheduleReceiver: return all idle executors if there are any idle executors") {
-    val executors = Seq("host1", "host2", "host3", "host4", "host5")
-    // host3 is idle
+    val executors = (1 to 5).map(i => ExecutorCacheTaskLocation(s"host$i", s"$i"))
+    // executor 1 is busy, others are idle.
     val receiverTrackingInfoMap = Map(
-      0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")))
-    val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
+      0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some(executors(0))))
+    val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
       1, None, receiverTrackingInfoMap, executors)
-    assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5"))
+    assert(scheduledLocations.toSet === executors.tail.toSet)
   }
 
   test("rescheduleReceiver: return all executors that have minimum weight if no idle executors") {
-    val executors = Seq("host1", "host2", "host3", "host4", "host5")
+    val executors = Seq(
+      ExecutorCacheTaskLocation("host1", "1"),
+      ExecutorCacheTaskLocation("host2", "2"),
+      ExecutorCacheTaskLocation("host3", "3"),
+      ExecutorCacheTaskLocation("host4", "4"),
+      ExecutorCacheTaskLocation("host5", "5")
+    )
     // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5
     val receiverTrackingInfoMap = Map(
-      0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
-      1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
-      2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None),
-      3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None))
-    val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
+      0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None,
+        Some(ExecutorCacheTaskLocation("host1", "1"))),
+      1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED,
+        Some(Seq(ExecutorCacheTaskLocation("host2", "2"), ExecutorCacheTaskLocation("host3", "3"))),
+        None),
+      2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED,
+        Some(Seq(ExecutorCacheTaskLocation("host1", "1"), ExecutorCacheTaskLocation("host3", "3"))),
+        None),
+      3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED,
+        Some(Seq(ExecutorCacheTaskLocation("host4", "4"),
+          ExecutorCacheTaskLocation("host5", "5"))), None))
+    val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
       4, None, receiverTrackingInfoMap, executors)
-    assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
+    val expectedScheduledLocations = Set(
+      ExecutorCacheTaskLocation("host2", "2"),
+      ExecutorCacheTaskLocation("host4", "4"),
+      ExecutorCacheTaskLocation("host5", "5")
+    )
+    assert(scheduledLocations.toSet === expectedScheduledLocations)
   }
 
   test("scheduleReceivers: " +
     "schedule receivers evenly when there are more receivers than executors") {
     val receivers = (0 until 6).map(new RateTestReceiver(_))
-    val executors = (10000 until 10003).map(port => s"localhost:${port}")
-    val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
-    val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+    val executors = (0 until 3).map(executorId =>
+      ExecutorCacheTaskLocation("localhost", executorId.toString))
+    val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+    val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
     // There should be 2 receivers running on each executor and each receiver has one executor
-    scheduledExecutors.foreach { case (receiverId, executors) =>
-      assert(executors.size == 1)
-      numReceiversOnExecutor(executors(0)) = numReceiversOnExecutor.getOrElse(executors(0), 0) + 1
+    scheduledLocations.foreach { case (receiverId, locations) =>
+      assert(locations.size == 1)
+      assert(locations(0).isInstanceOf[ExecutorCacheTaskLocation])
+      numReceiversOnExecutor(locations(0)) = numReceiversOnExecutor.getOrElse(locations(0), 0) + 1
     }
     assert(numReceiversOnExecutor === executors.map(_ -> 2).toMap)
   }
 
-
   test("scheduleReceivers: " +
     "schedule receivers evenly when there are more executors than receivers") {
     val receivers = (0 until 3).map(new RateTestReceiver(_))
-    val executors = (10000 until 10006).map(port => s"localhost:${port}")
-    val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
-    val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+    val executors = (0 until 6).map(executorId =>
+      ExecutorCacheTaskLocation("localhost", executorId.toString))
+    val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+    val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
     // There should be 1 receiver running on each executor and each receiver has two executors
-    scheduledExecutors.foreach { case (receiverId, executors) =>
-      assert(executors.size == 2)
-      executors.foreach { l =>
+    scheduledLocations.foreach { case (receiverId, locations) =>
+      assert(locations.size == 2)
+      locations.foreach { l =>
+        assert(l.isInstanceOf[ExecutorCacheTaskLocation])
         numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1
       }
     }
@@ -96,34 +119,41 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
   test("scheduleReceivers: schedule receivers evenly when the preferredLocations are even") {
     val receivers = (0 until 3).map(new RateTestReceiver(_)) ++
       (3 until 6).map(new RateTestReceiver(_, Some("localhost")))
-    val executors = (10000 until 10003).map(port => s"localhost:${port}") ++
-      (10003 until 10006).map(port => s"localhost2:${port}")
-    val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
-    val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+    val executors = (0 until 3).map(executorId =>
+      ExecutorCacheTaskLocation("localhost", executorId.toString)) ++
+      (3 until 6).map(executorId =>
+        ExecutorCacheTaskLocation("localhost2", executorId.toString))
+    val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+    val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
     // There should be 1 receiver running on each executor and each receiver has 1 executor
-    scheduledExecutors.foreach { case (receiverId, executors) =>
+    scheduledLocations.foreach { case (receiverId, executors) =>
       assert(executors.size == 1)
       executors.foreach { l =>
+        assert(l.isInstanceOf[ExecutorCacheTaskLocation])
         numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1
       }
     }
     assert(numReceiversOnExecutor === executors.map(_ -> 1).toMap)
     // Make sure we schedule the receivers to their preferredLocations
     val executorsForReceiversWithPreferredLocation =
-      scheduledExecutors.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2)
+      scheduledLocations.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2)
     // We can simply check the executor set because we only know each receiver only has 1 executor
     assert(executorsForReceiversWithPreferredLocation.toSet ===
-      (10000 until 10003).map(port => s"localhost:${port}").toSet)
+      (0 until 3).map(executorId =>
+        ExecutorCacheTaskLocation("localhost", executorId.toString)
+      ).toSet)
   }
 
   test("scheduleReceivers: return empty if no receiver") {
-    assert(receiverSchedulingPolicy.scheduleReceivers(Seq.empty, Seq("localhost:10000")).isEmpty)
+    val scheduledLocations = receiverSchedulingPolicy.
+      scheduleReceivers(Seq.empty, Seq(ExecutorCacheTaskLocation("localhost", "1")))
+    assert(scheduledLocations.isEmpty)
   }
 
   test("scheduleReceivers: return empty scheduled executors if no executors") {
     val receivers = (0 until 3).map(new RateTestReceiver(_))
-    val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
-    scheduledExecutors.foreach { case (receiverId, executors) =>
+    val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
+    scheduledLocations.foreach { case (receiverId, executors) =>
       assert(executors.isEmpty)
     }
   }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index fda86aef457d4a0ab58c0849db04da3bde223732..3bd8d086abf7f61d7845de2436611d05c1364324 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -99,8 +99,8 @@ class ReceiverTrackerSuite extends TestSuiteBase {
       output.register()
       ssc.start()
       eventually(timeout(10 seconds), interval(10 millis)) {
-        // If preferredLocations is set correctly, receiverTaskLocality should be NODE_LOCAL
-        assert(receiverTaskLocality === TaskLocality.NODE_LOCAL)
+        // If preferredLocations is set correctly, receiverTaskLocality should be PROCESS_LOCAL
+        assert(receiverTaskLocality === TaskLocality.PROCESS_LOCAL)
       }
     }
   }