Skip to content
Snippets Groups Projects
Commit d55e3aa4 authored by Tathagata Das's avatar Tathagata Das
Browse files

Updated JavaStreamingContext with updated kafkaStream API.

parent c6b2f765
No related branches found
No related tags found
No related merge requests found
...@@ -45,27 +45,24 @@ class JavaStreamingContext(val ssc: StreamingContext) { ...@@ -45,27 +45,24 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/** /**
* Create an input stream that pulls messages form a Kafka Broker. * Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param port Zookeper port.
* @param groupId The group id for this consumer. * @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread. * in its own thread.
*/ */
def kafkaStream[T]( def kafkaStream[T](
hostname: String, zkQuorum: String,
port: Int,
groupId: String, groupId: String,
topics: JMap[String, JInt]) topics: JMap[String, JInt])
: JavaDStream[T] = { : JavaDStream[T] = {
implicit val cmt: ClassManifest[T] = implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream[T](hostname, port, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) ssc.kafkaStream[T](zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
} }
/** /**
* Create an input stream that pulls messages form a Kafka Broker. * Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param port Zookeper port.
* @param groupId The group id for this consumer. * @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread. * in its own thread.
...@@ -73,8 +70,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { ...@@ -73,8 +70,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* By default the value is pulled from zookeper. * By default the value is pulled from zookeper.
*/ */
def kafkaStream[T]( def kafkaStream[T](
hostname: String, zkQuorum: String,
port: Int,
groupId: String, groupId: String,
topics: JMap[String, JInt], topics: JMap[String, JInt],
initialOffsets: JMap[KafkaPartitionKey, JLong]) initialOffsets: JMap[KafkaPartitionKey, JLong])
...@@ -82,8 +78,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { ...@@ -82,8 +78,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
implicit val cmt: ClassManifest[T] = implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream[T]( ssc.kafkaStream[T](
hostname, zkQuorum,
port,
groupId, groupId,
Map(topics.mapValues(_.intValue()).toSeq: _*), Map(topics.mapValues(_.intValue()).toSeq: _*),
Map(initialOffsets.mapValues(_.longValue()).toSeq: _*)) Map(initialOffsets.mapValues(_.longValue()).toSeq: _*))
...@@ -91,8 +86,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { ...@@ -91,8 +86,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/** /**
* Create an input stream that pulls messages form a Kafka Broker. * Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param port Zookeper port.
* @param groupId The group id for this consumer. * @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread. * in its own thread.
...@@ -101,8 +95,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { ...@@ -101,8 +95,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param storageLevel RDD storage level. Defaults to memory-only * @param storageLevel RDD storage level. Defaults to memory-only
*/ */
def kafkaStream[T]( def kafkaStream[T](
hostname: String, zkQuorum: String,
port: Int,
groupId: String, groupId: String,
topics: JMap[String, JInt], topics: JMap[String, JInt],
initialOffsets: JMap[KafkaPartitionKey, JLong], initialOffsets: JMap[KafkaPartitionKey, JLong],
...@@ -111,8 +104,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { ...@@ -111,8 +104,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
implicit val cmt: ClassManifest[T] = implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream[T]( ssc.kafkaStream[T](
hostname, zkQuorum,
port,
groupId, groupId,
Map(topics.mapValues(_.intValue()).toSeq: _*), Map(topics.mapValues(_.intValue()).toSeq: _*),
Map(initialOffsets.mapValues(_.longValue()).toSeq: _*), Map(initialOffsets.mapValues(_.longValue()).toSeq: _*),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment