diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 88c1b7bafffe9d354b227e8f895b24eedcabfd5d..89c51a44c98790c479dbf0260b563189770295bf 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -510,6 +510,12 @@ class DAGScheduler(
         tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
       }
     }
+    // must be run listener before possible NotSerializableException
+    // should be "StageSubmitted" first and then "JobEnded"
+    val properties = idToActiveJob(stage.priority).properties
+    sparkListeners.foreach(_.onStageSubmitted(
+      SparkListenerStageSubmitted(stage, tasks.size, properties)))
+    
     if (tasks.size > 0) {
       // Preemptively serialize a task to make sure it can be serialized. We are catching this
       // exception here because it would be fairly hard to catch the non-serializable exception
@@ -523,9 +529,7 @@ class DAGScheduler(
           running -= stage
           return
       }
-      val properties = idToActiveJob(stage.priority).properties
-      sparkListeners.foreach(_.onStageSubmitted(
-        SparkListenerStageSubmitted(stage, tasks.size, properties)))
+
       logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
       myPending ++= tasks
       logDebug("New pending tasks: " + myPending)