diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 3d3b9ea01181ce1004c388b7f6b6e693887dae66..30a648f50b4d2ef8e7c77dcb32fc6df5de388829 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -150,7 +150,13 @@ class DAGScheduler(
    * as a result stage for the final RDD used directly in an action. The stage will also be given
    * the provided priority.
    */
-  private def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = {
+  private def newStage(
+      rdd: RDD[_],
+      shuffleDep: Option[ShuffleDependency[_,_]],
+      priority: Int,
+      callSite: Option[String] = None)
+    : Stage =
+  {
     if (shuffleDep != None) {
       // Kind of ugly: need to register RDDs with the cache and map output tracker here
       // since we can't do it in the RDD constructor because # of partitions is unknown
@@ -158,7 +164,7 @@ class DAGScheduler(
       mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size)
     }
     val id = nextStageId.getAndIncrement()
-    val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority)
+    val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority, callSite)
     idToStage(id) = stage
     stageToInfos(stage) = StageInfo(stage)
     stage
@@ -286,12 +292,12 @@ class DAGScheduler(
     event match {
       case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) =>
         val runId = nextRunId.getAndIncrement()
-        val finalStage = newStage(finalRDD, None, runId)
+        val finalStage = newStage(finalRDD, None, runId, Some(callSite))
         val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties)
         clearCacheLocs()
         logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
                 " output partitions (allowLocal=" + allowLocal + ")")
-        logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")")
+        logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
         logInfo("Parents of final stage: " + finalStage.parents)
         logInfo("Missing parents: " + getMissingParentStages(finalStage))
         if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
@@ -502,7 +508,7 @@ class DAGScheduler(
         case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
         case _ => "Unkown"
       }
-      logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime))
+      logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
       stage.completionTime = Some(System.currentTimeMillis)
       val stageComp = StageCompleted(stageToInfos(stage))
       sparkListeners.foreach{_.onStageCompleted(stageComp)}
@@ -568,7 +574,7 @@ class DAGScheduler(
               if (stage.outputLocs.count(_ == Nil) != 0) {
                 // Some tasks had failed; let's resubmit this stage
                 // TODO: Lower-level scheduler should also deal with this
-                logInfo("Resubmitting " + stage + " (" + stage.origin +
+                logInfo("Resubmitting " + stage + " (" + stage.name +
                   ") because some of its tasks had failed: " +
                   stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
                 submitStage(stage)
@@ -600,7 +606,7 @@ class DAGScheduler(
         running -= failedStage
         failed += failedStage
         // TODO: Cancel running tasks in the stage
-        logInfo("Marking " + failedStage + " (" + failedStage.origin +
+        logInfo("Marking " + failedStage + " (" + failedStage.name +
           ") for resubmision due to a fetch failure")
         // Mark the map whose fetch failed as broken in the map stage
         val mapStage = shuffleToMapStage(shuffleId)
@@ -608,7 +614,7 @@ class DAGScheduler(
           mapStage.removeOutputLoc(mapId, bmAddress)
           mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
         }
-        logInfo("The failed fetch was from " + mapStage + " (" + mapStage.origin +
+        logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
           "); marking it for resubmission")
         failed += mapStage
         // Remember that a fetch failed now; this is used to resubmit the broken
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index 539cf8233bfe12aed9c2781b41d0df2684c7fa13..4937eb3b8820d7b466691d2b94dfa9aaf643cdaa 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -24,7 +24,8 @@ private[spark] class Stage(
     val rdd: RDD[_],
     val shuffleDep: Option[ShuffleDependency[_,_]],  // Output shuffle if stage is a map stage
     val parents: List[Stage],
-    val priority: Int)
+    val priority: Int,
+    callSite: Option[String])
   extends Logging {
 
   val isShuffleMap = shuffleDep != None
@@ -85,7 +86,7 @@ private[spark] class Stage(
     return id
   }
 
-  def origin: String = rdd.origin
+  val name = callSite.getOrElse(rdd.origin)
 
   override def toString = "Stage " + id
 
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index 7907ab3bc705a5987938973e6b6cc9f2fc3ccf4c..2df5f0192bf30b4d62604ea5fe537f293eb66a30 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -89,7 +89,7 @@ private[spark] class IndexPage(parent: JobProgressUI) {
 
     <tr>
       <td>{s.id}</td>
-      <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.origin}</a></td>
+      <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td>
       <td>{submissionTime}</td>
       <td>{getElapsedTime(s.submissionTime,
              s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
index 699901f1a1369f09d82af9ada043b1cdd2343e9e..2474d744b80fdcf9803a4ddea6fffeef2d0a8a82 100644
--- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
@@ -37,8 +37,8 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
     val parentRdd = makeRdd(4, Nil)
     val shuffleDep = new ShuffleDependency(parentRdd, null)
     val rootRdd = makeRdd(4, List(shuffleDep))
-    val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID) 
-    val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID)
+    val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID, None)
+    val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID, None)
     
     joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4))
     joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)