diff --git a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
index 7b954a477570f5c42e0b8ee4529b7234f751667e..9c37fadb78d2f260839061deee0f0dd89eb748f6 100644
--- a/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
@@ -38,7 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
     }
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
   }
 
   test("halting by voting") {
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 80a611b18079b1932124e567613186e6542e03e1..30d182b008930af670b7e9120e78ed69189febd3 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -32,15 +32,16 @@ import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
 
 private[spark] sealed trait MapOutputTrackerMessage
-private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
+private[spark] case class GetMapOutputStatuses(shuffleId: Int)
   extends MapOutputTrackerMessage
 private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
 
 private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster)
   extends Actor with Logging {
   def receive = {
-    case GetMapOutputStatuses(shuffleId: Int, requester: String) =>
-      logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + requester)
+    case GetMapOutputStatuses(shuffleId: Int) =>
+      val hostPort = sender.path.address.hostPort
+      logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
       sender ! tracker.getSerializedMapOutputStatuses(shuffleId)
 
     case StopMapOutputTracker =>
@@ -119,11 +120,10 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
       if (fetchedStatuses == null) {
         // We won the race to fetch the output locs; do so
         logInfo("Doing the fetch; tracker actor = " + trackerActor)
-        val hostPort = Utils.localHostPort(conf)
         // This try-finally prevents hangs due to timeouts:
         try {
           val fetchedBytes =
-            askTracker(GetMapOutputStatuses(shuffleId, hostPort)).asInstanceOf[Array[Byte]]
+            askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]
           fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
           logInfo("Got the output locations")
           mapStatuses.put(shuffleId, fetchedStatuses)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 08b592df715b32fab640e013220013dc91bf0cb6..ed788560e79f17c2e22c6a9527f4b9b29513124a 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -132,16 +132,6 @@ object SparkEnv extends Logging {
       conf.set("spark.driver.port",  boundPort.toString)
     }
 
-    // set only if unset until now.
-    if (!conf.contains("spark.hostPort")) {
-      if (!isDriver){
-        // unexpected
-        Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
-      }
-      Utils.checkHost(hostname)
-      conf.set("spark.hostPort",  hostname + ":" + boundPort)
-    }
-
     val classLoader = Thread.currentThread.getContextClassLoader
 
     // Create an instance of the class named by the given Java system property, or by
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index f9e43e0e94404b0039532bb922b1a569ed5c9bb9..45b43b403dd8c087fa730300b53a29e4352b4027 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -103,7 +103,6 @@ private[spark] object CoarseGrainedExecutorBackend {
       indestructible = true, conf = new SparkConf)
     // set it
     val sparkHostPort = hostname + ":" + boundPort
-//    conf.set("spark.hostPort",  sparkHostPort)
     actorSystem.actorOf(
       Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
       name = "Executor")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 8d596a76c224c079ab7a5729c9689686beb5c329..0208388e86680754ca01e9a6a3db224a35f73e39 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -165,7 +165,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
   override def start() {
     val properties = new ArrayBuffer[(String, String)]
     for ((key, value) <- scheduler.sc.conf.getAll) {
-      if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
+      if (key.startsWith("spark.")) {
         properties += ((key, value))
       }
     }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e1fc0c707f722c5d5159dff3824c7b4fe51c7a2c..6f1345c57a295609ef53eebeaaecfd2c505c4efa 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -83,8 +83,6 @@ private[spark] class BlockManager(
 
   val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
 
-  val hostPort = Utils.localHostPort(conf)
-
   val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
     name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
 
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a98adf98358a4aa44baca9c109b4b1b156be8e43..caa9bf4c9280eff3e6ed396b38c0e2d896218498 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -420,15 +420,6 @@ private[spark] object Utils extends Logging {
     InetAddress.getByName(address).getHostName
   }
 
-  def localHostPort(conf: SparkConf): String = {
-    val retval = conf.get("spark.hostPort", null)
-    if (retval == null) {
-      logErrorWithStack("spark.hostPort not set but invoking localHostPort")
-      return localHostName()
-    }
-    retval
-  }
-
   def checkHost(host: String, message: String = "") {
     assert(host.indexOf(':') == -1, message)
   }
@@ -437,14 +428,6 @@ private[spark] object Utils extends Logging {
     assert(hostPort.indexOf(':') != -1, message)
   }
 
-  def logErrorWithStack(msg: String) {
-    try {
-      throw new Exception
-    } catch {
-      case ex: Exception => logError(msg, ex)
-    }
-  }
-
   // Typically, this will be of order of number of nodes in cluster
   // If not, we should change it to LRUCache or something.
   private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index 8dd5786da6ff5c93f3bee3b3287baf89c7551220..3ac706110e287dfb6bd15978af9c53b6013b7ac2 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -53,7 +53,6 @@ object LocalSparkContext {
     }
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
   }
 
   /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index afc1beff989c4d47a5181e8b22b6fe4385f14d3d..930c2523caf8c0adc2ced32532b5e54bff090fbf 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -99,7 +99,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
     val hostname = "localhost"
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf)
     System.setProperty("spark.driver.port", boundPort.toString)    // Will be cleared by LocalSparkContext
-    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
 
     val masterTracker = new MapOutputTrackerMaster(conf)
     masterTracker.trackerActor = actorSystem.actorOf(
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f60ce270c7387f06b08afcd1e68168ca53037ce8..18aa587662d245b9444ddbb9d68c2c6bc1c1046f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -53,7 +53,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf)
     this.actorSystem = actorSystem
     conf.set("spark.driver.port", boundPort.toString)
-    conf.set("spark.hostPort", "localhost:" + boundPort)
 
     master = new BlockManagerMaster(
       actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
@@ -65,13 +64,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
-    // Set some value ...
-    conf.set("spark.hostPort", Utils.localHostName() + ":" + 1111)
   }
 
   after {
     System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
 
     if (store != null) {
       store.stop()
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index daaa2a0305113527d66d1d69cb54c791933c91ae..8aad27366524afa9820a8a518dd5c7831b27e7a2 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -35,7 +35,6 @@ class ReplSuite extends FunSuite {
     }
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
     return out.toString
   }
 
@@ -75,7 +74,6 @@ class ReplSuite extends FunSuite {
 
     interp.sparkContext.stop()
     System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
   }
 
   test("simple foreach with accumulator") {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 44e396e1cdb2be662fc01806c373874521c651ba..5046a1d53fa41a992a43c4605b7be84f1c66cc6d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -46,7 +46,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
 
   // These should be unset when a checkpoint is deserialized,
   // otherwise the SparkContext won't initialize correctly.
-  sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port")
+  sparkConf.remove("spark.driver.host").remove("spark.driver.port")
 
   def validate() {
     assert(master != null, "Checkpoint.master is null")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 162b19d7f0e9eeee56a808762531e85b05c064a0..592e84791bf131252e07c30e68b4709db7339891 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -186,7 +186,6 @@ object MasterFailureTest extends Logging {
 
     // Setup the streaming computation with the given operation
     System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
     val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map())
     ssc.checkpoint(checkpointDir.toString)
     val inputStream = ssc.textFileStream(testDir.toString)
@@ -233,7 +232,6 @@ object MasterFailureTest extends Logging {
         // (iii) Its not timed out yet
         System.clearProperty("spark.streaming.clock")
         System.clearProperty("spark.driver.port")
-        System.clearProperty("spark.hostPort")
         ssc.start()
         val startTime = System.currentTimeMillis()
         while (!killed && !isLastOutputGenerated && !isTimedOut) {
diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 34bee568859f9736f66e2428305f3b07e1fa5457..849bbf1299182cdb38979c4fd60ba2432f2129af 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -28,7 +28,6 @@ public abstract class LocalJavaStreamingContext {
     @Before
     public void setUp() {
         System.clearProperty("spark.driver.port");
-        System.clearProperty("spark.hostPort");
         System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
         ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
         ssc.checkpoint("checkpoint");
@@ -41,6 +40,5 @@ public abstract class LocalJavaStreamingContext {
 
         // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
         System.clearProperty("spark.driver.port");
-        System.clearProperty("spark.hostPort");
     }
 }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 9590bca9892fe6899be0171450915b4de9394f68..67ce5bc566b1540124a985095c9a2345f6eba701 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -358,7 +358,6 @@ class CheckpointSuite extends TestSuiteBase {
     )
     ssc = new StreamingContext(checkpointDir)
     System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
     ssc.start()
     val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
     // the first element will be re-processed data of the last batch before restart
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 63a07cfbdfa5aaf66f75ee84bbbb9808e380f500..9b2bb57e7798b56710f0b329c167c087dde558be 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -156,7 +156,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
   def afterFunction() {
     // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
-    System.clearProperty("spark.hostPort")
   }
 
   before(beforeFunction)