Skip to content
Snippets Groups Projects
Commit d51ed263 authored by prabs's avatar prabs Committed by Sean Owen
Browse files

[SPARK-5666][streaming][MQTT streaming] some trivial fixes

modified to adhere to accepted coding standards as pointed by tdas in PR #3844

Author: prabs <prabsmails@gmail.com>
Author: Prabeesh K <prabsmails@gmail.com>

Closes #4178 from prabeesh/master and squashes the following commits:

bd2cb49 [Prabeesh K] adress the comment
ccc0765 [prabs] adress the comment
46f9619 [prabs] adress the comment
c035bdc [prabs] adress the comment
22dd7f7 [prabs] address the comments
0cc67bd [prabs] adress the comment
838c38e [prabs] adress the comment
cd57029 [prabs] address the comments
66919a3 [Prabeesh K] changed MqttDefaultFilePersistence to MemoryPersistence
5857989 [prabs] modified to adhere to accepted coding standards
parent d641fbb3
No related branches found
No related tags found
No related merge requests found
......@@ -17,8 +17,8 @@
package org.apache.spark.examples.streaming
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
......@@ -31,8 +31,6 @@ import org.apache.spark.SparkConf
*/
object MQTTPublisher {
var client: MqttClient = _
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
......@@ -42,25 +40,36 @@ object MQTTPublisher {
StreamingExamples.setStreamingLogLevels()
val Seq(brokerUrl, topic) = args.toSeq
var client: MqttClient = null
try {
var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp")
client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
val persistence = new MemoryPersistence()
client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
client.connect()
val msgtopic = client.getTopic(topic)
val msgContent = "hello mqtt demo for spark streaming"
val message = new MqttMessage(msgContent.getBytes("utf-8"))
while (true) {
try {
msgtopic.publish(message)
println(s"Published data. topic: {msgtopic.getName()}; Message: {message}")
} catch {
case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
Thread.sleep(10)
println("Queue is full, wait for to consume data from the message queue")
}
}
} catch {
case e: MqttException => println("Exception Caught: " + e)
} finally {
if (client != null) {
client.disconnect()
}
}
client.connect()
val msgtopic: MqttTopic = client.getTopic(topic)
val msg: String = "hello mqtt demo for spark streaming"
while (true) {
val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8"))
msgtopic.publish(message)
println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
}
client.disconnect()
}
}
......@@ -96,9 +105,9 @@ object MQTTWordCount {
val sparkConf = new SparkConf().setAppName("MQTTWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
val words = lines.flatMap(x => x.toString.split(" "))
val words = lines.flatMap(x => x.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
......
......@@ -17,23 +17,23 @@
package org.apache.spark.streaming.mqtt
import java.io.IOException
import java.util.concurrent.Executors
import java.util.Properties
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import java.util.Properties
import java.util.concurrent.Executors
import java.io.IOException
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttCallback
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
......@@ -82,18 +82,18 @@ class MQTTReceiver(
val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
// Callback automatically triggers as and when new message arrives on specified topic
val callback: MqttCallback = new MqttCallback() {
val callback = new MqttCallback() {
// Handles Mqtt message
override def messageArrived(arg0: String, arg1: MqttMessage) {
store(new String(arg1.getPayload(),"utf-8"))
override def messageArrived(topic: String, message: MqttMessage) {
store(new String(message.getPayload(),"utf-8"))
}
override def deliveryComplete(arg0: IMqttDeliveryToken) {
override def deliveryComplete(token: IMqttDeliveryToken) {
}
override def connectionLost(arg0: Throwable) {
restart("Connection lost ", arg0)
override def connectionLost(cause: Throwable) {
restart("Connection lost ", cause)
}
}
......
......@@ -17,10 +17,11 @@
package org.apache.spark.streaming.mqtt
import scala.reflect.ClassTag
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
import scala.reflect.ClassTag
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object MQTTUtils {
......
......@@ -42,8 +42,8 @@ import org.apache.spark.util.Utils
class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
private val batchDuration = Milliseconds(500)
private val master: String = "local[2]"
private val framework: String = this.getClass.getSimpleName
private val master = "local[2]"
private val framework = this.getClass.getSimpleName
private val freePort = findFreePort()
private val brokerUri = "//localhost:" + freePort
private val topic = "def"
......@@ -69,7 +69,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
test("mqtt input stream") {
val sendMessage = "MQTT demo for spark streaming"
val receiveStream: ReceiverInputDStream[String] =
val receiveStream =
MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
@volatile var receiveMessage: List[String] = List()
receiveStream.foreachRDD { rdd =>
......@@ -123,12 +123,12 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
def publishData(data: String): Unit = {
var client: MqttClient = null
try {
val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
client.connect()
if (client.isConnected) {
val msgTopic: MqttTopic = client.getTopic(topic)
val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
val msgTopic = client.getTopic(topic)
val message = new MqttMessage(data.getBytes("utf-8"))
message.setQos(1)
message.setRetained(true)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment