From 2e48b23eae6cfc5e7c825573e2739e54c4569045 Mon Sep 17 00:00:00 2001
From: prabeesh <prabsmails@gmail.com>
Date: Wed, 16 Oct 2013 13:36:25 +0530
Subject: [PATCH] added mqtt adapter

---
 .../apache/spark/streaming/StreamingContext.scala | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 878725c705..8ed5dfb99b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -450,6 +450,21 @@ class StreamingContext private (
     inputStream
   }
 
+/**
+   * Create an input stream that receives messages pushed by a mqtt publisher.
+   * @param brokerUrl Url of remote mqtt publisher
+   * @param topic topic name to subscribe to
+   * @param storageLevel RDD storage level. Defaults to memory-only.
+   */
+
+  def mqttStream(
+    brokerUrl: String,
+    topic: String,
+    storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = {
+    val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel)
+    registerInputStream(inputStream)
+    inputStream
+  }
   /**
    * Create a unified DStream from multiple DStreams of the same type and same interval
    */
-- 
GitLab