From d58502a1562bbfb1bb4e517ebcc8239efd639297 Mon Sep 17 00:00:00 2001
From: Andrew xia <junluan.xia@intel.com>
Date: Thu, 1 Aug 2013 23:21:41 +0800
Subject: [PATCH] fix bug of spark "SubmitStage" listener as unit test error

---
 core/src/main/scala/spark/scheduler/DAGScheduler.scala | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 88c1b7baff..89c51a44c9 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -510,6 +510,12 @@ class DAGScheduler(
         tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
       }
     }
+    // must be run listener before possible NotSerializableException
+    // should be "StageSubmitted" first and then "JobEnded"
+    val properties = idToActiveJob(stage.priority).properties
+    sparkListeners.foreach(_.onStageSubmitted(
+      SparkListenerStageSubmitted(stage, tasks.size, properties)))
+    
     if (tasks.size > 0) {
       // Preemptively serialize a task to make sure it can be serialized. We are catching this
       // exception here because it would be fairly hard to catch the non-serializable exception
@@ -523,9 +529,7 @@ class DAGScheduler(
           running -= stage
           return
       }
-      val properties = idToActiveJob(stage.priority).properties
-      sparkListeners.foreach(_.onStageSubmitted(
-        SparkListenerStageSubmitted(stage, tasks.size, properties)))
+
       logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
       myPending ++= tasks
       logDebug("New pending tasks: " + myPending)
-- 
GitLab