diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 1c895430582110baa70bb3ba82d53b7178be79e3..e0677b795cb944e21db32e8f9b3a541229500b58 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -154,6 +154,10 @@ class StreamingContext private[streaming] (
 
   private[streaming] val uiTab = new StreamingTab(this)
 
+  /** Register streaming source to metrics system */
+  private val streamingSource = new StreamingSource(this)
+  SparkEnv.get.metricsSystem.registerSource(streamingSource)
+
   /** Enumeration to identify current state of the StreamingContext */
   private[streaming] object StreamingContextState extends Enumeration {
     type CheckpointState = Value
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
new file mode 100644
index 0000000000000000000000000000000000000000..774adc3c23c212351737112f8a5473544b5f6d41
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
+  val metricRegistry = new MetricRegistry
+  val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)
+
+  val streamingListener = ssc.uiTab.listener
+
+  private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
+      defaultValue: T) {
+    metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] {
+      override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue)
+    })
+  }
+
+  // Gauge for number of network receivers
+  registerGauge("receivers", _.numReceivers, 0)
+
+  // Gauge for number of total completed batches
+  registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
+
+  // Gauge for number of unprocessed batches
+  registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
+
+  // Gauge for number of waiting batches
+  registerGauge("waitingBatches", _.waitingBatches.size, 0L)
+
+  // Gauge for number of running batches
+  registerGauge("runningBatches", _.runningBatches.size, 0L)
+
+  // Gauge for number of retained completed batches
+  registerGauge("retainedCompletedBatches", _.retainedCompletedBatches.size, 0L)
+
+  // Gauge for last completed batch, useful for monitoring the streaming job's running status,
+  // displayed data -1 for any abnormal condition.
+  registerGauge("lastCompletedBatch_submissionTime",
+    _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
+  registerGauge("lastCompletedBatch_processStartTime",
+    _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
+  registerGauge("lastCompletedBatch_processEndTime",
+    _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
+
+  // Gauge for last received batch, useful for monitoring the streaming job's running status,
+  // displayed data -1 for any abnormal condition.
+  registerGauge("lastReceivedBatch_submissionTime",
+    _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
+  registerGauge("lastReceivedBatch_processStartTime",
+    _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
+  registerGauge("lastReceivedBatch_processEndTime",
+    _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index bf637c144631488a92c5bd7221e656c067155644..14c33c728bfe1a8e93aedd65f965001e08f34deb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -28,7 +28,8 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
 import org.apache.spark.util.Distribution
 
 
-private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener {
+private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
+  extends StreamingListener {
 
   private val waitingBatchInfos = new HashMap[Time, BatchInfo]
   private val runningBatchInfos = new HashMap[Time, BatchInfo]