Skip to content
Snippets Groups Projects
Commit d223d389 authored by Prabeesh K's avatar Prabeesh K
Browse files

Update MQTTInputDStream.scala

parent 890f8fe4
No related branches found
No related tags found
No related merge requests found
......@@ -46,24 +46,31 @@ import scala.collection.JavaConversions._
* @param storageLevel RDD storage level.
*/
private[streaming] class MQTTInputDStream[T: ClassManifest](
private[streaming]
class MQTTInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc_) with Logging {
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) with Logging {
def getReceiver(): NetworkReceiver[T] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
private[streaming] class MQTTReceiver(brokerUrl: String,
private[streaming]
class MQTTReceiver(brokerUrl: String,
topic: String,
storageLevel: StorageLevel) extends NetworkReceiver[Any] {
storageLevel: StorageLevel
) extends NetworkReceiver[Any] {
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
def onStop() {
blockGenerator.stop()
}
def onStart() {
blockGenerator.start()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment