diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ab374cb71286a6845704f18400b069f4eb888497..af4456c05b0a1ed0d3819ff32d20fec7cdcf5755 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -581,6 +581,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
     // Post init
     _taskScheduler.postStartHook()
+    _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
     _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
     _executorAllocationManager.foreach { e =>
       _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4a9518fff4e7bbb4c7bde125467377efbd237c35..ae725b467d8c4360f420da54b3ed67bb52d74c66 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -130,7 +130,7 @@ class DAGScheduler(
 
   def this(sc: SparkContext) = this(sc, sc.taskScheduler)
 
-  private[scheduler] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
+  private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
 
   private[scheduler] val nextJobId = new AtomicInteger(0)
   private[scheduler] def numTotalJobs: Int = nextJobId.get()
@@ -1580,8 +1580,6 @@ class DAGScheduler(
     taskScheduler.stop()
   }
 
-  // Start the event thread and register the metrics source at the end of the constructor
-  env.metricsSystem.registerSource(metricsSource)
   eventProcessLoop.start()
 }