diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 878725c705db71807c567c909b1b6ab0f48f7f94..8ed5dfb99b73c2aba7216e0f6a6b9c6a90275ec2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -450,6 +450,21 @@ class StreamingContext private (
     inputStream
   }
 
+/**
+   * Create an input stream that receives messages pushed by a mqtt publisher.
+   * @param brokerUrl Url of remote mqtt publisher
+   * @param topic topic name to subscribe to
+   * @param storageLevel RDD storage level. Defaults to memory-only.
+   */
+
+  def mqttStream(
+    brokerUrl: String,
+    topic: String,
+    storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = {
+    val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel)
+    registerInputStream(inputStream)
+    inputStream
+  }
   /**
    * Create a unified DStream from multiple DStreams of the same type and same interval
    */