From 890f8fe4393a20749e0a6cfd57ff07f60cfad2a1 Mon Sep 17 00:00:00 2001 From: prabeesh <prabsmails@gmail.com> Date: Thu, 17 Oct 2013 10:00:40 +0530 Subject: [PATCH] modify code, use Spark Logging Class --- .../streaming/dstream/MQTTInputDStream.scala | 61 ++++++++----------- 1 file changed, 26 insertions(+), 35 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala index 3416989c02..9b3fe67e6a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@ -23,16 +23,16 @@ import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContex import java.util.Properties import java.util.concurrent.Executors -import java.io.IOException; +import java.io.IOException -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttClientPersistence; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.MqttTopic; +import org.eclipse.paho.client.mqttv3.MqttCallback +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic import scala.collection.Map import scala.collection.mutable.HashMap @@ -50,9 +50,7 @@ private[streaming] class MQTTInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, brokerUrl: String, topic: String, - storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging { - def getReceiver(): NetworkReceiver[T] = { new MQTTReceiver(brokerUrl, topic, storageLevel) .asInstanceOf[NetworkReceiver[T]] @@ -62,50 +60,43 @@ private[streaming] class MQTTInputDStream[T: ClassManifest]( private[streaming] class MQTTReceiver(brokerUrl: String, topic: String, storageLevel: StorageLevel) extends NetworkReceiver[Any] { - lazy protected val blockGenerator = new BlockGenerator(storageLevel) - def onStop() { blockGenerator.stop() } - def onStart() { blockGenerator.start() - //Set up persistence for messages - var peristance:MqttClientPersistence =new MemoryPersistence(); + // Set up persistence for messages + var peristance: MqttClientPersistence = new MemoryPersistence() + + // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) - //Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance - var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance); + // Connect to MqttBroker + client.connect() - //Connect to MqttBroker - client.connect(); - - //Subscribe to Mqtt topic - client.subscribe(topic); - - //Callback automatically triggers as and when new message arrives on specified topic + // Subscribe to Mqtt topic + client.subscribe(topic) + + // Callback automatically triggers as and when new message arrives on specified topic var callback: MqttCallback = new MqttCallback() { - //Handles Mqtt message + // Handles Mqtt message override def messageArrived(arg0: String, arg1: MqttMessage) { blockGenerator += new String(arg1.getPayload()) } - + override def deliveryComplete(arg0: IMqttDeliveryToken) { } override def connectionLost(arg0: Throwable) { - System.err.println("Connection lost " + arg0) - + logInfo("Connection lost " + arg0) } + } - }; - - //Set up callback for MqttClient - client.setCallback(callback); - + // Set up callback for MqttClient + client.setCallback(callback) } - } -- GitLab