Skip to content
Snippets Groups Projects
Commit e767d7dd authored by bilna's avatar bilna Committed by Tathagata Das
Browse files

[SPARK-4631] unit test for MQTT

Please review the unit test for MQTT

Author: bilna <bilnap@am.amrita.edu>
Author: Bilna P <bilna.p@gmail.com>

Closes #3844 from Bilna/master and squashes the following commits:

acea3a3 [bilna] Adding dependency with scope test
28681fa [bilna] Merge remote-tracking branch 'upstream/master'
fac3904 [bilna] Correction in Indentation and coding style
ed9db4c [bilna] Merge remote-tracking branch 'upstream/master'
4b34ee7 [Bilna P] Update MQTTStreamSuite.scala
04503cf [bilna] Added embedded broker service for mqtt test
89d804e [bilna] Merge remote-tracking branch 'upstream/master'
fc8eb28 [bilna] Merge remote-tracking branch 'upstream/master'
4b58094 [Bilna P] Update MQTTStreamSuite.scala
b1ac4ad [bilna] Added BeforeAndAfter
5f6bfd2 [bilna] Added BeforeAndAfter
e8b6623 [Bilna P] Update MQTTStreamSuite.scala
5ca6691 [Bilna P] Update MQTTStreamSuite.scala
8616495 [bilna] [SPARK-4631] unit test for MQTT
parent 3fddc946
No related branches found
No related tags found
No related merge requests found
......@@ -66,6 +66,12 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
......
......@@ -17,31 +17,111 @@
package org.apache.spark.streaming.mqtt
import org.scalatest.FunSuite
import java.net.{URI, ServerSocket}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.activemq.broker.{TransportConnector, BrokerService}
import org.apache.spark.util.Utils
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Eventually
import scala.concurrent.duration._
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
class MQTTStreamSuite extends FunSuite {
val batchDuration = Seconds(1)
class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
private val batchDuration = Milliseconds(500)
private val master: String = "local[2]"
private val framework: String = this.getClass.getSimpleName
private val freePort = findFreePort()
private val brokerUri = "//localhost:" + freePort
private val topic = "def"
private var ssc: StreamingContext = _
private val persistenceDir = Utils.createTempDir()
private var broker: BrokerService = _
private var connector: TransportConnector = _
test("mqtt input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
val brokerUrl = "abc"
val topic = "def"
before {
ssc = new StreamingContext(master, framework, batchDuration)
setupMQTT()
}
// tests the API, does not actually test data receiving
val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
val test2: ReceiverInputDStream[String] =
MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
after {
if (ssc != null) {
ssc.stop()
ssc = null
}
Utils.deleteRecursively(persistenceDir)
tearDownMQTT()
}
// TODO: Actually test receiving data
test("mqtt input stream") {
val sendMessage = "MQTT demo for spark streaming"
val receiveStream: ReceiverInputDStream[String] =
MQTTUtils.createStream(ssc, "tcp:" + brokerUri, topic, StorageLevel.MEMORY_ONLY)
var receiveMessage: List[String] = List()
receiveStream.foreachRDD { rdd =>
if (rdd.collect.length > 0) {
receiveMessage = receiveMessage ::: List(rdd.first)
receiveMessage
}
}
ssc.start()
publishData(sendMessage)
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(sendMessage.equals(receiveMessage(0)))
}
ssc.stop()
}
private def setupMQTT() {
broker = new BrokerService()
connector = new TransportConnector()
connector.setName("mqtt")
connector.setUri(new URI("mqtt:" + brokerUri))
broker.addConnector(connector)
broker.start()
}
private def tearDownMQTT() {
if (broker != null) {
broker.stop()
broker = null
}
if (connector != null) {
connector.stop()
connector = null
}
}
private def findFreePort(): Int = {
Utils.startServiceOnPort(23456, (trialPort: Int) => {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
})._2
}
def publishData(data: String): Unit = {
var client: MqttClient = null
try {
val persistence: MqttClientPersistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
client = new MqttClient("tcp:" + brokerUri, MqttClient.generateClientId(), persistence)
client.connect()
if (client.isConnected) {
val msgTopic: MqttTopic = client.getTopic(topic)
val message: MqttMessage = new MqttMessage(data.getBytes("utf-8"))
message.setQos(1)
message.setRetained(true)
for (i <- 0 to 100)
msgTopic.publish(message)
}
} finally {
client.disconnect()
client.close()
client = null
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment