diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 42c3b4a6cf633773b8278ddcd1a1f8b23ac01b2b..8bdc1469e62c04166308a8f5091a3a456a1aaef0 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -49,7 +49,7 @@ private[spark] class SparkDeploySchedulerBackend(
     val sparkHome = sc.getSparkHome().getOrElse(
       throw new IllegalArgumentException("must supply spark home for spark standalone"))
     val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
-        sc.ui.appUIAddress)
+        sc.ui.appHttpUIAddress)
 
     client = new Client(sc.env.actorSystem, master, appDesc, this)
     client.start()
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 23ded44ba383de802aabbecaf136bfd53cbc09d4..e078c4a6b2486ad7946fdab26af29a4f185a4ffe 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -1,5 +1,4 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
+/* * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
@@ -78,7 +77,9 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
     server.foreach(_.stop())
   }
 
-  private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1")
+  private[spark] def appHttpUIAddress = "http://" + appUIAddress
+  private[spark] def appUIAddress = host + ":" + boundPort.getOrElse("-1")
+
 }
 
 private[spark] object SparkUI {
diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala
index fe2afc11299e47d8210566762491bf682c36ef7b..6b45679f9d53c762a654d9720817356db96334b8 100644
--- a/core/src/main/scala/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/spark/ui/UIUtils.scala
@@ -25,33 +25,53 @@ import spark.SparkContext
 private[spark] object UIUtils {
   import Page._
 
+  // Yarn has to go through a proxy so the base uri is provided and has to be on all links
+  private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).
+                                         getOrElse("")
+
+  def addBaseUri(resource: String = ""): String = {
+    return uiRoot + resource
+  }
+
+  private[spark] val storageStr = addBaseUri("/storage")
+  private[spark] val stagesStr = addBaseUri("/stages")
+  private[spark] val envStr = addBaseUri("/environment")
+  private[spark] val executorsStr = addBaseUri("/executors")
+  private[spark] val bootstrapMinCssStr = addBaseUri("/static/bootstrap.min.css")
+  private[spark] val webuiCssStr = addBaseUri("/static/webui.css")
+  private[spark] val bootstrapResponsiveCssStr = addBaseUri("/static/bootstrap-responsive.min.css")
+  private[spark] val sortTableStr = addBaseUri("/static/sorttable.js")
+  private[spark] val sparkLogoHdStr = addBaseUri("/static/spark-logo-77x50px-hd.png")
+  private[spark] val sparkLogoStr = addBaseUri("/static/spark_logo.png")
+
+
   /** Returns a spark page with correctly formatted headers */
   def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
   : Seq[Node] = {
-    val jobs = page match {
-      case Jobs => <li class="active"><a href="/stages">Jobs</a></li>
-      case _ => <li><a href="/stages">Jobs</a></li>
-    }
     val storage = page match {
-      case Storage => <li class="active"><a href="/storage">Storage</a></li>
-      case _ => <li><a href="/storage">Storage</a></li>
+      case Storage => <li class="active"><a href={storageStr}>Storage</a></li>
+      case _ => <li><a href={storageStr}>Storage</a></li>
+    }
+    val jobs = page match {
+      case Jobs => <li class="active"><a href={stagesStr}>Jobs</a></li>
+      case _ => <li><a href={stagesStr}>Jobs</a></li>
     }
     val environment = page match {
-      case Environment => <li class="active"><a href="/environment">Environment</a></li>
-      case _ => <li><a href="/environment">Environment</a></li>
+      case Environment => <li class="active"><a href={envStr}>Environment</a></li>
+      case _ => <li><a href={envStr}>Environment</a></li>
     }
     val executors = page match {
-      case Executors => <li class="active"><a href="/executors">Executors</a></li>
-      case _ => <li><a href="/executors">Executors</a></li>
+      case Executors => <li class="active"><a href={executorsStr}>Executors</a></li>
+      case _ => <li><a href={executorsStr}>Executors</a></li>
     }
 
     <html>
       <head>
         <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
-        <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" />
-        <link rel="stylesheet" href="/static/webui.css" type="text/css" />
-        <link rel="stylesheet" href="/static/bootstrap-responsive.min.css" type="text/css" />
-        <script src="/static/sorttable.js"></script>
+        <link rel="stylesheet" href={bootstrapMinCssStr} type="text/css" />
+        <link rel="stylesheet" href={webuiCssStr} type="text/css" />
+        <link rel="stylesheet" href={bootstrapResponsiveCssStr} type="text/css" />
+        <script src={sortTableStr}></script>
         <title>{sc.appName} - {title}</title>
         <style type="text/css">
           table.sortable thead {{ cursor: pointer; }}
@@ -65,7 +85,7 @@ private[spark] object UIUtils {
               <div class="navbar">
                 <div class="navbar-inner">
                   <div class="container">
-                    <a href="/" class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></a>
+                    <a href="/" class="brand"><img src={sparkLogoHdStr} /></a>
                     <ul class="nav nav-pills">
                       {jobs}
                       {storage}
@@ -98,9 +118,9 @@ private[spark] object UIUtils {
     <html>
       <head>
         <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
-        <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" />
-        <link rel="stylesheet" href="/static/bootstrap-responsive.min.css" type="text/css" />
-        <script src="/static/sorttable.js"></script>
+        <link rel="stylesheet" href={bootstrapMinCssStr} type="text/css" />
+        <link rel="stylesheet" href={bootstrapResponsiveCssStr} type="text/css" />
+        <script src={sortTableStr}></script>
         <title>{title}</title>
         <style type="text/css">
           table.sortable thead {{ cursor: pointer; }}
@@ -110,7 +130,7 @@ private[spark] object UIUtils {
         <div class="container">
           <div class="row">
             <div class="span2">
-              <img src="/static/spark_logo.png" />
+              <img src={sparkLogoStr} />
             </div>
             <div class="span10">
               <h3 style="vertical-align: bottom; margin-top: 40px; display: inline-block;">
diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
index 621828f9c3ea44899b0fee7b6bc6ceba9cc1837c..a6bd734e294620229b523f681504c3e7785cd418 100644
--- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
@@ -6,6 +6,7 @@ import scala.xml.Node
 
 import spark.scheduler.Stage
 import spark.scheduler.cluster.Schedulable
+import spark.ui.UIUtils
 
 /** Table showing list of pools */
 private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
@@ -43,7 +44,7 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
       case None => 0
     }
     <tr>
-      <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
+      <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.addBaseUri(),p.name)}>{p.name}</a></td>
       <td>{p.minShare}</td>
       <td>{p.weight}</td>
       <td>{activeStages}</td>
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
index b31f4abc26d65d39e8c30f22ff19286c3f5ad3b9..f3d870c8137f7850c77e906157be9cd948a26479 100644
--- a/core/src/main/scala/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -8,6 +8,7 @@ import scala.collection.mutable.HashSet
 import spark.Utils
 import spark.scheduler.cluster.{SchedulingMode, TaskInfo}
 import spark.scheduler.Stage
+import spark.ui.UIUtils
 
 
 /** Page showing list of all ongoing and recently finished stages */
@@ -81,7 +82,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
 
     val poolName = listener.stageToPool.get(s)
 
-    val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
+    val nameLink = <a href={"%s/stages/stage?id=%s".format(UIUtils.addBaseUri(),s.id)}>{s.name}</a>
     val description = listener.stageToDescription.get(s)
       .map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
     val finishTime = s.completionTime.getOrElse(System.currentTimeMillis())
@@ -90,7 +91,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
     <tr>
       <td>{s.id}</td>
       {if (isFairScheduler) {
-        <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
+        <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.addBaseUri(),poolName.get)}>{poolName.get}</a></td>}
       }
       <td>{description}</td>
       <td valign="middle">{submissionTime}</td>
diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala
index 0751f9e8f9102ad2abbd6d463140544b480e8157..9675f80aff4eaaff7fdcb1c5f6501d7d4c952b89 100644
--- a/core/src/main/scala/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala
@@ -50,7 +50,7 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
   def rddRow(rdd: RDDInfo): Seq[Node] = {
     <tr>
       <td>
-        <a href={"/storage/rdd?id=%s".format(rdd.id)}>
+        <a href={"%s/storage/rdd?id=%s".format(addBaseUri(),rdd.id)}>
           {rdd.name}
         </a>
       </td>
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 678cd57abac4c4deefef6000397316b58fe8ee4b..3f0d077f7128a5e4a001a712775bf9490ba84aba 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -46,8 +46,12 @@ If you want to test out the YARN deployment mode, you can use the current Spark
 
 Most of the configs are the same for Spark on YARN as other deploys. See the Configuration page for more information on those.  These are configs that are specific to SPARK on YARN.
 
+Environment variables:
 * `SPARK_YARN_USER_ENV`, to add environment variables to the Spark processes launched on YARN. This can be a comma separated list of environment variables. ie SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar"
 
+Properties:
+* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
+
 # Launching Spark on YARN
 
 Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster.
diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
index 0f3b6bc1a65d0c77e6d84d330529705ff5fd1238..d6acb080cc4bd6f9da05fed30a38eb4f7fbd2ab5 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -45,6 +45,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
 
   private var yarnAllocator: YarnAllocationHandler = null
   private var isFinished:Boolean = false
+  private var uiAddress: String = ""
+
 
   def run() {
     // setup the directories so things go to yarn approved directories rather
@@ -53,21 +55,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     
     appAttemptId = getApplicationAttemptId()
     resourceManager = registerWithResourceManager()
-    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
 
+    // TODO: Uncomment when hadoop is on a version which has this fixed.
     // Compute number of threads for akka
-    val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+    //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
 
-    if (minimumMemory > 0) {
-      val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
-      val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+    //if (minimumMemory > 0) {
+    //  val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+    //  val numCore = (mem  / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
 
-      if (numCore > 0) {
+    //  if (numCore > 0) {
         // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
         // TODO: Uncomment when hadoop is on a version which has this fixed.
         // args.workerCores = numCore
-      }
-    }
+    //  }
+    //}
 
     // Workaround until hadoop moves to something which has
     // https://issues.apache.org/jira/browse/HADOOP-8406
@@ -83,6 +85,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     // This a bit hacky, but we need to wait until the spark.driver.port property has
     // been set by the Thread executing the user class.
     waitForSparkMaster()
+
+    waitForSparkContextInitialized()
+
+    // do this after spark master is up and SparkContext is created so that we can register UI Url
+    val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
     
     // Allocate all containers
     allocateWorkers()
@@ -134,8 +141,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     // Users can then monitor stderr/stdout on that node if required.
     appMasterRequest.setHost(Utils.localHostName())
     appMasterRequest.setRpcPort(0)
-    // What do we provide here ? Might make sense to expose something sensible later ?
-    appMasterRequest.setTrackingUrl("")
+    appMasterRequest.setTrackingUrl(uiAddress)
     return resourceManager.registerApplicationMaster(appMasterRequest)
   }
   
@@ -143,7 +149,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     logInfo("Waiting for spark driver to be reachable.")
     var driverUp = false
     var tries = 0
-    while(!driverUp && tries < 10) {
+    val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+    while(!driverUp && tries < numTries) {
       val driverHost = System.getProperty("spark.driver.host")
       val driverPort = System.getProperty("spark.driver.port")
       try {
@@ -189,24 +196,44 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
     return t
   }
 
-  private def allocateWorkers() {
+  // this need to happen before allocateWorkers
+  private def waitForSparkContextInitialized() {
     logInfo("Waiting for spark context initialization")
-
     try {
       var sparkContext: SparkContext = null
       ApplicationMaster.sparkContextRef.synchronized {
         var count = 0
-        while (ApplicationMaster.sparkContextRef.get() == null && count < 10) {
+        val waitTime = 10000L
+        val numTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+        while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
           logInfo("Waiting for spark context initialization ... " + count)
           count = count + 1
-          ApplicationMaster.sparkContextRef.wait(10000L)
+          ApplicationMaster.sparkContextRef.wait(waitTime)
         }
         sparkContext = ApplicationMaster.sparkContextRef.get()
-        assert(sparkContext != null)
-        this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, sparkContext.preferredNodeLocationData)
+        assert(sparkContext != null || count >= numTries)
+
+        if (null != sparkContext) {
+          uiAddress = sparkContext.ui.appUIAddress
+          this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, 
+                                               sparkContext.preferredNodeLocationData) 
+        } else {
+          logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime + 
+                  ", numTries = " + numTries)
+          this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
+        }
       }
+    } finally {
+      // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
+      // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+    }
+  }
+
 
 
+  private def allocateWorkers() {
+    try {
       logInfo("Allocating " + args.numWorkers + " workers.")
       // Wait until all containers have finished
       // TODO: This is a bit ugly. Can we make it nicer?
@@ -298,6 +325,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
       .asInstanceOf[FinishApplicationMasterRequest]
     finishReq.setAppAttemptId(appAttemptId)
     finishReq.setFinishApplicationStatus(status)
+    // set tracking url to empty since we don't have a history server
+    finishReq.setTrackingUrl("")
     resourceManager.finishApplicationMaster(finishReq)
 
   }
diff --git a/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala
index b0af8baf08c152293e96cd87aef2cda27e39bba9..1f235cef88950bb798533a5d353b03149b45a7c7 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -479,6 +479,15 @@ object YarnAllocationHandler {
   private val hostToRack = new ConcurrentHashMap[String, String]()
   private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
 
+
+  def newAllocator(conf: Configuration,
+                   resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
+                   args: ApplicationMasterArguments): YarnAllocationHandler = {
+
+    new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers, 
+      args.workerMemory, args.workerCores, Map[String, Int](), Map[String, Int]())
+  }
+
   def newAllocator(conf: Configuration,
                    resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
                    args: ApplicationMasterArguments,
@@ -486,7 +495,6 @@ object YarnAllocationHandler {
 
     val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
 
-
     new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers, 
       args.workerMemory, args.workerCores, hostToCount, rackToCount)
   }
diff --git a/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
index bb58353e0cc68e657b20c5e06478d12545fb08ee..58a3f4043ade745271cd4aec563405de285707a9 100644
--- a/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration
  */
 private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
 
+  logInfo("Created YarnClusterScheduler")
+
   def this(sc: SparkContext) = this(sc, new Configuration())
 
   // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate