diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py
index d2644a1d4ffab6be726c74d7e8a761b24c574363..66e8f8ef001e32137a3473d82dfd27a565959066 100644
--- a/python/pyspark/streaming/__init__.py
+++ b/python/pyspark/streaming/__init__.py
@@ -17,5 +17,6 @@
 
 from pyspark.streaming.context import StreamingContext
 from pyspark.streaming.dstream import DStream
+from pyspark.streaming.listener import StreamingListener
 
-__all__ = ['StreamingContext', 'DStream']
+__all__ = ['StreamingContext', 'DStream', 'StreamingListener']
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 8be56c99152654b72202af8b153e2dde25946631..1388b6d044e046064773de0ef80534a0bccda4b2 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -363,3 +363,11 @@ class StreamingContext(object):
         first = dstreams[0]
         jrest = [d._jdstream for d in dstreams[1:]]
         return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer)
+
+    def addStreamingListener(self, streamingListener):
+        """
+        Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+        receiving system events related to streaming.
+        """
+        self._jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper(
+            self._jvm.PythonStreamingListenerWrapper(streamingListener)))
diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py
new file mode 100644
index 0000000000000000000000000000000000000000..b830797f5c0a033be7877e7ec1fbda69dd35cb1b
--- /dev/null
+++ b/python/pyspark/streaming/listener.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ["StreamingListener"]
+
+
+class StreamingListener(object):
+
+    def __init__(self):
+        pass
+
+    def onReceiverStarted(self, receiverStarted):
+        """
+        Called when a receiver has been started
+        """
+        pass
+
+    def onReceiverError(self, receiverError):
+        """
+        Called when a receiver has reported an error
+        """
+        pass
+
+    def onReceiverStopped(self, receiverStopped):
+        """
+        Called when a receiver has been stopped
+        """
+        pass
+
+    def onBatchSubmitted(self, batchSubmitted):
+        """
+        Called when a batch of jobs has been submitted for processing.
+        """
+        pass
+
+    def onBatchStarted(self, batchStarted):
+        """
+        Called when processing of a batch of jobs has started.
+        """
+        pass
+
+    def onBatchCompleted(self, batchCompleted):
+        """
+        Called when processing of a batch of jobs has completed.
+        """
+        pass
+
+    def onOutputOperationStarted(self, outputOperationStarted):
+        """
+        Called when processing of a job of a batch has started.
+        """
+        pass
+
+    def onOutputOperationCompleted(self, outputOperationCompleted):
+        """
+        Called when processing of a job of a batch has completed
+        """
+        pass
+
+    class Java:
+        implements = ["org.apache.spark.streaming.api.java.PythonStreamingListener"]
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 6ee864d8d3da64f5f34c74d79b41d82ee964bc97..2983028413bb80337d90b6699cf08f40753fe9c9 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -48,6 +48,7 @@ from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPar
 from pyspark.streaming.flume import FlumeUtils
 from pyspark.streaming.mqtt import MQTTUtils
 from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+from pyspark.streaming.listener import StreamingListener
 
 
 class PySparkStreamingTestCase(unittest.TestCase):
@@ -403,6 +404,128 @@ class BasicOperationTests(PySparkStreamingTestCase):
         self._test_func(input, func, expected)
 
 
+class StreamingListenerTests(PySparkStreamingTestCase):
+
+    duration = .5
+
+    class BatchInfoCollector(StreamingListener):
+
+        def __init__(self):
+            super(StreamingListener, self).__init__()
+            self.batchInfosCompleted = []
+            self.batchInfosStarted = []
+            self.batchInfosSubmitted = []
+
+        def onBatchSubmitted(self, batchSubmitted):
+            self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
+
+        def onBatchStarted(self, batchStarted):
+            self.batchInfosStarted.append(batchStarted.batchInfo())
+
+        def onBatchCompleted(self, batchCompleted):
+            self.batchInfosCompleted.append(batchCompleted.batchInfo())
+
+    def test_batch_info_reports(self):
+        batch_collector = self.BatchInfoCollector()
+        self.ssc.addStreamingListener(batch_collector)
+        input = [[1], [2], [3], [4]]
+
+        def func(dstream):
+            return dstream.map(int)
+        expected = [[1], [2], [3], [4]]
+        self._test_func(input, func, expected)
+
+        batchInfosSubmitted = batch_collector.batchInfosSubmitted
+        batchInfosStarted = batch_collector.batchInfosStarted
+        batchInfosCompleted = batch_collector.batchInfosCompleted
+
+        self.wait_for(batchInfosCompleted, 4)
+
+        self.assertGreaterEqual(len(batchInfosSubmitted), 4)
+        for info in batchInfosSubmitted:
+            self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
+            self.assertGreaterEqual(info.submissionTime(), 0)
+
+            for streamId in info.streamIdToInputInfo():
+                streamInputInfo = info.streamIdToInputInfo()[streamId]
+                self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
+                self.assertGreaterEqual(streamInputInfo.numRecords, 0)
+                for key in streamInputInfo.metadata():
+                    self.assertIsNotNone(streamInputInfo.metadata()[key])
+                self.assertIsNotNone(streamInputInfo.metadataDescription())
+
+            for outputOpId in info.outputOperationInfos():
+                outputInfo = info.outputOperationInfos()[outputOpId]
+                self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
+                self.assertGreaterEqual(outputInfo.id(), 0)
+                self.assertIsNotNone(outputInfo.name())
+                self.assertIsNotNone(outputInfo.description())
+                self.assertGreaterEqual(outputInfo.startTime(), -1)
+                self.assertGreaterEqual(outputInfo.endTime(), -1)
+                self.assertIsNone(outputInfo.failureReason())
+
+            self.assertEqual(info.schedulingDelay(), -1)
+            self.assertEqual(info.processingDelay(), -1)
+            self.assertEqual(info.totalDelay(), -1)
+            self.assertEqual(info.numRecords(), 0)
+
+        self.assertGreaterEqual(len(batchInfosStarted), 4)
+        for info in batchInfosStarted:
+            self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
+            self.assertGreaterEqual(info.submissionTime(), 0)
+
+            for streamId in info.streamIdToInputInfo():
+                streamInputInfo = info.streamIdToInputInfo()[streamId]
+                self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
+                self.assertGreaterEqual(streamInputInfo.numRecords, 0)
+                for key in streamInputInfo.metadata():
+                    self.assertIsNotNone(streamInputInfo.metadata()[key])
+                self.assertIsNotNone(streamInputInfo.metadataDescription())
+
+            for outputOpId in info.outputOperationInfos():
+                outputInfo = info.outputOperationInfos()[outputOpId]
+                self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
+                self.assertGreaterEqual(outputInfo.id(), 0)
+                self.assertIsNotNone(outputInfo.name())
+                self.assertIsNotNone(outputInfo.description())
+                self.assertGreaterEqual(outputInfo.startTime(), -1)
+                self.assertGreaterEqual(outputInfo.endTime(), -1)
+                self.assertIsNone(outputInfo.failureReason())
+
+            self.assertGreaterEqual(info.schedulingDelay(), 0)
+            self.assertEqual(info.processingDelay(), -1)
+            self.assertEqual(info.totalDelay(), -1)
+            self.assertEqual(info.numRecords(), 0)
+
+        self.assertGreaterEqual(len(batchInfosCompleted), 4)
+        for info in batchInfosCompleted:
+            self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
+            self.assertGreaterEqual(info.submissionTime(), 0)
+
+            for streamId in info.streamIdToInputInfo():
+                streamInputInfo = info.streamIdToInputInfo()[streamId]
+                self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
+                self.assertGreaterEqual(streamInputInfo.numRecords, 0)
+                for key in streamInputInfo.metadata():
+                    self.assertIsNotNone(streamInputInfo.metadata()[key])
+                self.assertIsNotNone(streamInputInfo.metadataDescription())
+
+            for outputOpId in info.outputOperationInfos():
+                outputInfo = info.outputOperationInfos()[outputOpId]
+                self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
+                self.assertGreaterEqual(outputInfo.id(), 0)
+                self.assertIsNotNone(outputInfo.name())
+                self.assertIsNotNone(outputInfo.description())
+                self.assertGreaterEqual(outputInfo.startTime(), 0)
+                self.assertGreaterEqual(outputInfo.endTime(), 0)
+                self.assertIsNone(outputInfo.failureReason())
+
+            self.assertGreaterEqual(info.schedulingDelay(), 0)
+            self.assertGreaterEqual(info.processingDelay(), 0)
+            self.assertGreaterEqual(info.totalDelay(), 0)
+            self.assertEqual(info.numRecords(), 0)
+
+
 class WindowFunctionTests(PySparkStreamingTestCase):
 
     timeout = 15
@@ -1308,7 +1431,8 @@ if __name__ == "__main__":
 
     os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
     testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
-                 KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests]
+                 KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests,
+                 StreamingListenerTests]
 
     if kinesis_jar_present is True:
         testcases.append(KinesisStreamTests)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
index 34429074fe804118573098ce6b25a6b22996c2cb..7bfd6bd5af759af4a9988fc299d92542bd189d61 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
@@ -18,6 +18,82 @@
 package org.apache.spark.streaming.api.java
 
 import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler.StreamingListener
+
+private[streaming] trait PythonStreamingListener{
+
+  /** Called when a receiver has been started */
+  def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted) { }
+
+  /** Called when a receiver has reported an error */
+  def onReceiverError(receiverError: JavaStreamingListenerReceiverError) { }
+
+  /** Called when a receiver has been stopped */
+  def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped) { }
+
+  /** Called when a batch of jobs has been submitted for processing. */
+  def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted) { }
+
+  /** Called when processing of a batch of jobs has started.  */
+  def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted) { }
+
+  /** Called when processing of a batch of jobs has completed. */
+  def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted) { }
+
+  /** Called when processing of a job of a batch has started. */
+  def onOutputOperationStarted(
+      outputOperationStarted: JavaStreamingListenerOutputOperationStarted) { }
+
+  /** Called when processing of a job of a batch has completed. */
+  def onOutputOperationCompleted(
+      outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted) { }
+}
+
+private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamingListener)
+  extends JavaStreamingListener {
+
+  /** Called when a receiver has been started */
+  override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
+    listener.onReceiverStarted(receiverStarted)
+  }
+
+  /** Called when a receiver has reported an error */
+  override def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = {
+    listener.onReceiverError(receiverError)
+  }
+
+  /** Called when a receiver has been stopped */
+  override def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = {
+    listener.onReceiverStopped(receiverStopped)
+  }
+
+  /** Called when a batch of jobs has been submitted for processing. */
+  override def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = {
+    listener.onBatchSubmitted(batchSubmitted)
+  }
+
+  /** Called when processing of a batch of jobs has started.  */
+  override def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = {
+    listener.onBatchStarted(batchStarted)
+  }
+
+  /** Called when processing of a batch of jobs has completed. */
+  override def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = {
+    listener.onBatchCompleted(batchCompleted)
+  }
+
+  /** Called when processing of a job of a batch has started. */
+  override def onOutputOperationStarted(
+    outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = {
+      listener.onOutputOperationStarted(outputOperationStarted)
+  }
+
+  /** Called when processing of a job of a batch has completed. */
+  override def onOutputOperationCompleted(
+    outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = {
+      listener.onOutputOperationCompleted(outputOperationCompleted)
+  }
+}
 
 /**
  * A listener interface for receiving information about an ongoing streaming  computation.