Skip to content
Snippets Groups Projects
Commit f7d3e309 authored by Prashant Sharma's avatar Prashant Sharma
Browse files

ZeroMQ stream as receiver

parent 8b9c673f
No related branches found
No related tags found
No related merge requests found
---
layout: global
title: ZeroMQ Stream setup guide
---
## Install ZeroMQ (using JNA)
To work with zeroMQ, some native libraries have to be installed.
* Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software)
Typically if you are using ubuntu 12.04, you can do:
`$ sudo apt-get install libzmq1`
__To work with akka-zeromq, zmq 2.1 version is supported via [JNA](https://github.com/twall/jna). Incase you want to switch to zeromq 3.0, please install [JZMQ](http://www.zeromq.org/bindings:java) which uses [JNI](http://docs.oracle.com/javase/6/docs/technotes/guides/jni/) and drop in jzmq jar__
## Sample scala code
A publisher is an entity assumed to be outside the spark ecosystem. A sample zeroMQ publisher is provided to try out the sample spark ZeroMQ application.
1. Start the sample publisher.
{% highlight scala %}
val acs: ActorSystem = ActorSystem()
val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
pubSocket ! ZMQMessage(Seq(Frame("topic"), Frame("My message".getBytes)))
{% endhighlight %}
A typical zeromq url looks like `tcp://<ip>:<port>`
It does nothing more than publishing the message on the specified topic and url.
2. Start the spark application by plugging the zeroMQ stream receiver.
{% highlight scala %}
val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToObjectsIterator)
{% endhighlight %}
bytesToObjectsIterator is going to be a function for decoding the Frame data.
_For example: For decoding into strings using default charset:_
{% highlight scala %}
def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
{% endhighlight %}
......@@ -134,6 +134,7 @@ object SparkBuild extends Build {
"com.typesafe.akka" % "akka-actor" % "2.0.3",
"com.typesafe.akka" % "akka-remote" % "2.0.3",
"com.typesafe.akka" % "akka-slf4j" % "2.0.3",
"com.typesafe.akka" % "akka-zeromq" % "2.0.3",
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
"cc.spray" % "spray-can" % "1.0-M2.1",
......
......@@ -2,12 +2,14 @@ package spark.streaming
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
import spark.streaming.dstream._
import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.streaming.receivers.ActorReceiver
import spark.streaming.receivers.ReceiverSupervisorStrategy
import spark.streaming.receivers.ZeroMQReceiver
import spark.storage.StorageLevel
import spark.util.MetadataCleaner
import spark.streaming.receivers.ActorReceiver
......@@ -174,6 +176,26 @@ class StreamingContext private (
networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
}
/**
* ZeroMQ stream receiver
* @param publisherUrl Url of remote zeromq publisher
* @param zeroMQ topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def zeroMQStream[T: ClassManifest](publisherUrl:String,
subscribe: Subscribe,
bytesToObjects: Seq[Seq[Byte]] Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
......
package spark.streaming.receivers
import akka.actor.Actor
import akka.zeromq._
import spark.Logging
/**
* A receiver to subscribe to ZeroMQ stream.
*/
private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: Seq[Seq[Byte]] Iterator[T])
extends Actor with Receiver with Logging {
override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self),
Connect(publisherUrl), subscribe)
def receive: Receive = {
case Connecting logInfo("connecting ...")
case m: ZMQMessage
logDebug("Received message for:" + m.firstFrameAsString)
//We ignore first frame for processing as it is the topic
val bytes = m.frames.tail.map(_.payload)
pushBlock(bytesToObjects(bytes))
case Closed logInfo("received closed ")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment