diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index 83900a18df327f0af93c36bd87c119f8170a7ca9..0a2b3def183fc7f97514ba4fb62040e17163077f 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -20,7 +20,7 @@ package org.apache.spark.streaming.examples; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.api.java.flume.FlumeFunctions; +import org.apache.spark.streaming.flume.FlumeUtils; import org.apache.spark.streaming.flume.SparkFlumeEvent; /** @@ -53,8 +53,7 @@ public class JavaFlumeEventCount { JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class)); - FlumeFunctions flumeFunc = new FlumeFunctions(ssc); - JavaDStream<SparkFlumeEvent> flumeStream = flumeFunc.flumeStream("localhost", port); + JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port); flumeStream.count(); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 51de4054cc93e4897353424b96fcc6cb594ffd2a..3bd7a3a90ef2626e0347cbb7613b8ed449282a89 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -29,7 +29,7 @@ import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.api.java.kafka.KafkaFunctions; +import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; /** @@ -54,7 +54,7 @@ public class JavaKafkaWordCount { } // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "KafkaWordCount", + JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount", new Duration(2000), System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class)); @@ -65,8 +65,7 @@ public class JavaKafkaWordCount { topicMap.put(topic, numThreads); } - KafkaFunctions kafkaFunc = new KafkaFunctions(ssc); - JavaPairDStream<String, String> messages = kafkaFunc.kafkaStream(args[1], args[2], topicMap); + JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, args[1], args[2], topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override @@ -96,6 +95,6 @@ public class JavaKafkaWordCount { }); wordCounts.print(); - ssc.start(); + jssc.start(); } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala index 149640e0d18cdeff9ef13e465134aceaf680df7f..ae3709b3d97f5561f857b7d6fab8ef0436888cbc 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala @@ -52,7 +52,7 @@ object FlumeEventCount { System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) // Create a flume stream - val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY) + val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 633712e816b2707a3a4b92bb548ab1914a9ca52e..022c8c5cb90a0625f194315eed37d60c6e133f62 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -53,7 +53,7 @@ object KafkaWordCount { ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2) + val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1l)) .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index f65c3f8b91dbae864203bcaa10a34bc8a16ee98d..325290b66f4decbe55ed025a865816d09ecbcc5c 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -97,7 +97,7 @@ object MQTTWordCount { val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY) + val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) val words = lines.flatMap(x => x.toString.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index a60570f884f8758f204793606866060bc2e3d9e4..3ccdc908e23c43d0c3424a5b6719950e7c658702 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -35,7 +35,7 @@ import org.apache.spark.streaming.twitter._ * <p> * <p> * <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> - * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure + * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data structure * for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc), * that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the * estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold @@ -63,7 +63,7 @@ object TwitterAlgebirdCMS { val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index 1382fa4d1d0b1787382933a0ca0f7b410ef33bec..c7e83e76b00570e721ee1fe965d119178b14528d 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -50,7 +50,7 @@ object TwitterAlgebirdHLL { val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index 84842b3d65bdeb7f504ca8772dcec8d28eb9cac3..e2b0418d55d2b14b08d50aa1500a6806306a7f5c 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -40,7 +40,7 @@ object TwitterPopularTags { val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val stream = ssc.twitterStream(None, filters) + val stream = TwitterUtils.createStream(ssc, None, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index 789c5f2d0809a7713ac6b0e0bffaad1ed85c0b15..5a7673756e53b04a1aceb597eeecb454c1f319d9 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -85,11 +85,10 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator //For this stream, a zeroMQ publisher should be running. - val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator) + val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() } - } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala deleted file mode 100644 index 3347d19796a689cdfbbd0c44ff500c8f90209b41..0000000000000000000000000000000000000000 --- a/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.api.java.flume - -import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} -import org.apache.spark.streaming.flume._ -import org.apache.spark.storage.StorageLevel - -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating Flume input streams. - */ -class FlumeFunctions(javaStreamingContext: JavaStreamingContext) { - /** - * Creates a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - */ - def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { - javaStreamingContext.ssc.flumeStream(hostname, port) - } - - /** - * Creates a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - */ - def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): - JavaDStream[SparkFlumeEvent] = { - javaStreamingContext.ssc.flumeStream(hostname, port, storageLevel) - } -} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala similarity index 55% rename from external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala rename to external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 35e7a01abc030fd05692e0502876ac5425c27c3e..834b775d4fd2b09ea1c2a24dce48af963c57871b 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeFunctions.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -18,20 +18,19 @@ package org.apache.spark.streaming.flume import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ +import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} -/** - * Extra Flume input stream functions available on [[org.apache.spark.streaming.StreamingContext]] - * through implicit conversion. Import org.apache.spark.streaming.flume._ to use these functions. - */ -class FlumeFunctions(ssc: StreamingContext) { +object FlumeUtils { /** * Create a input stream from a Flume source. + * @param ssc StreamingContext object * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent * @param storageLevel Storage level to use for storing the received objects */ - def flumeStream ( + def createStream ( + ssc: StreamingContext, hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 @@ -40,4 +39,32 @@ class FlumeFunctions(ssc: StreamingContext) { ssc.registerInputStream(inputStream) inputStream } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + */ + def createStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int + ): JavaDStream[SparkFlumeEvent] = { + createStream(jssc.ssc, hostname, port) + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def createStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel + ): JavaDStream[SparkFlumeEvent] = { + createStream(jssc.ssc, hostname, port, storageLevel) + } } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala deleted file mode 100644 index c087a39d1cd78470b221d81d094de4e7d9597927..0000000000000000000000000000000000000000 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -package object flume { - implicit def sscToFlumeFunctions(ssc: StreamingContext) = new FlumeFunctions(ssc) -} - diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java index 5930fee925d2d7b33884b5e88212810e64cb20db..733389b98d22d60e5d60ea56d3eaa1aa8ffc407f 100644 --- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java @@ -1,4 +1,4 @@ -package org.apache.spark.streaming.flume;/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -15,21 +15,20 @@ package org.apache.spark.streaming.flume;/* * limitations under the License. */ +package org.apache.spark.streaming.flume; + import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.flume.FlumeFunctions; -import org.apache.spark.streaming.flume.SparkFlumeEvent; + import org.junit.Test; public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { @Test public void testFlumeStream() { - FlumeFunctions flumeFunc = new FlumeFunctions(ssc); - // tests the API, does not actually test data receiving - JavaDStream<SparkFlumeEvent> test1 = flumeFunc.flumeStream("localhost", 12345); - JavaDStream<SparkFlumeEvent> test2 = flumeFunc.flumeStream("localhost", 12345, + JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345); + JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); } } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 74840f6499425b76b6465cb90cc0f4dc4b2f6548..2e8e9fac455535e32f4d770de0534334d3d0d1d4 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -39,7 +39,7 @@ class FlumeStreamSuite extends TestSuiteBase { test("flume input stream") { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) - val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala deleted file mode 100644 index 491331bb3739065076acfb816bd1fb3f4c08ff8d..0000000000000000000000000000000000000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.api.java.kafka - -import scala.reflect.ClassTag -import scala.collection.JavaConversions._ - -import java.lang.{Integer => JInt} -import java.util.{Map => JMap} - -import kafka.serializer.Decoder - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} -import org.apache.spark.streaming.kafka._ - -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating Kafka input streams. - */ -class KafkaFunctions(javaStreamingContext: JavaStreamingContext) { - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt] - ): JavaPairDStream[String, String] = { - implicit val cmt: ClassTag[String] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. - * - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt], - storageLevel: StorageLevel - ): JavaPairDStream[String, String] = { - implicit val cmt: ClassTag[String] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param keyTypeClass Key type of RDD - * @param valueTypeClass value type of RDD - * @param keyDecoderClass Type of kafka key decoder - * @param valueDecoderClass Type of kafka value decoder - * @param kafkaParams Map of kafka configuration paramaters. - * See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. Defaults to memory-only - */ - def kafkaStream[K, V, U <: Decoder[_], T <: Decoder[_]]( - keyTypeClass: Class[K], - valueTypeClass: Class[V], - keyDecoderClass: Class[U], - valueDecoderClass: Class[T], - kafkaParams: JMap[String, String], - topics: JMap[String, JInt], - storageLevel: StorageLevel - ): JavaPairDStream[K, V] = { - implicit val keyCmt: ClassTag[K] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val valueCmt: ClassTag[V] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] - - implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] - implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] - - javaStreamingContext.ssc.kafkaStream[K, V, U, T]( - kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) - } -} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala deleted file mode 100644 index 2135634a69b8b7ea97a082000bee1e1d1972064c..0000000000000000000000000000000000000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaFunctions.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import scala.reflect.ClassTag - -import kafka.serializer.{Decoder, StringDecoder} - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ - -/** - * Extra Kafka input stream functions available on [[org.apache.spark.streaming.StreamingContext]] - * through implicit conversion. Import org.apache.spark.streaming.kafka._ to use these functions. - */ -class KafkaFunctions(ssc: StreamingContext) { - /** - * Create an input stream that pulls messages from a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel Storage level to use for storing the received objects - * (default: StorageLevel.MEMORY_AND_DISK_SER_2) - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: Map[String, Int], - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[(String, String)] = { - val kafkaParams = Map[String, String]( - "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, - "zookeeper.connection.timeout.ms" -> "10000") - kafkaStream[String, String, StringDecoder, StringDecoder]( - kafkaParams, - topics, - storageLevel) - } - - /** - * Create an input stream that pulls messages from a Kafka Broker. - * @param kafkaParams Map of kafka configuration paramaters. - * See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel Storage level to use for storing the received objects - */ - def kafkaStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ): DStream[(K, V)] = { - val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) - ssc.registerInputStream(inputStream) - inputStream - } -} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala new file mode 100644 index 0000000000000000000000000000000000000000..c2d851f94311d48fe7ce19ca6a1d14c49d5c48e4 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.reflect.ClassTag +import scala.collection.JavaConversions._ + +import java.lang.{Integer => JInt} +import java.util.{Map => JMap} + +import kafka.serializer.{Decoder, StringDecoder} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} + + +object KafkaUtils { + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param ssc StreamingContext object + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) + * @param groupId The group id for this consumer + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ + def createStream( + ssc: StreamingContext, + zkQuorum: String, + groupId: String, + topics: Map[String, Int], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[(String, String)] = { + val kafkaParams = Map[String, String]( + "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, + "zookeeper.connection.timeout.ms" -> "10000") + createStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topics, storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param ssc StreamingContext object + * @param kafkaParams Map of kafka configuration parameters, + * see http://kafka.apache.org/08/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + */ + def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: Manifest, T <: Decoder[_]: Manifest]( + ssc: StreamingContext, + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ): DStream[(K, V)] = { + val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param jssc JavaStreamingContext object + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) + * @param groupId The group id for this consumer + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread + */ + def createStream( + jssc: JavaStreamingContext, + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt] + ): JavaPairDStream[String, String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param jssc JavaStreamingContext object + * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. + * + */ + def createStream( + jssc: JavaStreamingContext, + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairDStream[String, String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param jssc JavaStreamingContext object + * @param keyTypeClass Key type of RDD + * @param valueTypeClass value type of RDD + * @param keyDecoderClass Type of kafka key decoder + * @param valueDecoderClass Type of kafka value decoder + * @param kafkaParams Map of kafka configuration parameters, + * see http://kafka.apache.org/08/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread + * @param storageLevel RDD storage level. Defaults to MEMORY_AND_DISK_2. + */ + def createStream[K, V, U <: Decoder[_], T <: Decoder[_]]( + jssc: JavaStreamingContext, + keyTypeClass: Class[K], + valueTypeClass: Class[V], + keyDecoderClass: Class[U], + valueDecoderClass: Class[T], + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val valueCmt: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + + implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] + implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] + + createStream[K, V, U, T]( + jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala deleted file mode 100644 index 44e7ce6e1bd37907dfc9ea908d8df75a09a7b8cf..0000000000000000000000000000000000000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -package object kafka { - implicit def sscToKafkaFunctions(ssc: StreamingContext) = new KafkaFunctions(ssc) -} - diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index fdea96e506223dac2c89773a9fcc11fecdded332..7b4999447ee69539c76a60244c668e8a5e6d586c 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -18,32 +18,27 @@ package org.apache.spark.streaming.kafka; import java.util.HashMap; - -import org.apache.spark.streaming.api.java.kafka.KafkaFunctions; import org.junit.Test; import com.google.common.collect.Maps; import kafka.serializer.StringDecoder; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; -import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; public class JavaKafkaStreamSuite extends LocalJavaStreamingContext { @Test public void testKafkaStream() { - HashMap<String, Integer> topics = Maps.newHashMap(); - KafkaFunctions kafkaFunc = new KafkaFunctions(ssc); // tests the API, does not actually test data receiving - JavaPairDStream<String, String> test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics); - JavaPairDStream<String, String> test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics, + JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics); + JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2()); HashMap<String, String> kafkaParams = Maps.newHashMap(); kafkaParams.put("zookeeper.connect", "localhost:12345"); kafkaParams.put("group.id","consumer-group"); - JavaPairDStream<String, String> test3 = kafkaFunc.kafkaStream( + JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2()); } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 2ef3e99c558d2d6071bd12431d1ed9e10795cddc..9c81f23c191180c3f11262965c4eea1d21ae9747 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -28,11 +28,11 @@ class KafkaStreamSuite extends TestSuiteBase { val topics = Map("my-topic" -> 1) // tests the API, does not actually test data receiving - val test1 = ssc.kafkaStream("localhost:12345", "group", topics) - val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2) + val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics) + val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2) val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") - val test3 = ssc.kafkaStream[String, String, StringDecoder, StringDecoder]( - kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) + val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2) // TODO: Actually test receiving data } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala deleted file mode 100644 index 86f4e9c724301756db7e0187013fa7c42bc13da5..0000000000000000000000000000000000000000 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.mqtt - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ - -/** - * Extra MQTT input stream functions available on [[org.apache.spark.streaming.StreamingContext]] - * through implicit conversions. Import org.apache.spark.streaming.mqtt._ to use these functions. - */ -class MQTTFunctions(ssc: StreamingContext) { - /** - * Create an input stream that receives messages pushed by a MQTT publisher. - * @param brokerUrl Url of remote MQTT publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. - */ - def mqttStream( - brokerUrl: String, - topic: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[String] = { - val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) - ssc.registerInputStream(inputStream) - inputStream - } -} diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala similarity index 52% rename from external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala rename to external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 72124956fcc574e19a3dc7dda107b227a230721c..0e6c25dbee8fbe5e63f0648c890143e9a4ae4509 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -15,45 +15,61 @@ * limitations under the License. */ -package org.apache.spark.streaming.api.java.mqtt - -import scala.reflect.ClassTag +package org.apache.spark.streaming.mqtt import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} -import org.apache.spark.streaming.mqtt._ +import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} +import scala.reflect.ClassTag -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating MQTT input streams. - */ -class MQTTFunctions(javaStreamingContext: JavaStreamingContext) { +object MQTTUtils { + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param ssc StreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topic Topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + */ + def createStream( + ssc: StreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[String] = { + val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } /** * Create an input stream that receives messages pushed by a MQTT publisher. + * @param jssc JavaStreamingContext object * @param brokerUrl Url of remote MQTT publisher - * @param topic topic name to subscribe to + * @param topic Topic name to subscribe to */ - def mqttStream( + def createStream( + jssc: JavaStreamingContext, brokerUrl: String, topic: String ): JavaDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - javaStreamingContext.ssc.mqttStream(brokerUrl, topic) + createStream(jssc.ssc, brokerUrl, topic) } /** * Create an input stream that receives messages pushed by a MQTT publisher. - * @param brokerUrl Url of remote MQTT publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topic Topic name to subscribe to + * @param storageLevel RDD storage level. */ - def mqttStream( + def createStream( + jssc: JavaStreamingContext, brokerUrl: String, topic: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + storageLevel: StorageLevel ): JavaDStream[String] = { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel) + createStream(jssc.ssc, brokerUrl, topic, storageLevel) } } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala deleted file mode 100644 index 28a944f57e3c3add8363f2ef2a74ad93bad0cf5b..0000000000000000000000000000000000000000 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -package object mqtt { - implicit def sscToMQTTFunctions(ssc: StreamingContext) = new MQTTFunctions(ssc) -} - - diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java index 3ddb4d084fb5ef78906e2b050fdb13044b21639d..44743aaecf986e902f64b2ce3f7ffbe6f0f5513d 100644 --- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java +++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -19,7 +19,6 @@ package org.apache.spark.streaming.mqtt; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions; import org.junit.Test; import org.apache.spark.streaming.LocalJavaStreamingContext; @@ -29,11 +28,10 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { public void testMQTTStream() { String brokerUrl = "abc"; String topic = "def"; - MQTTFunctions mqttFunc = new MQTTFunctions(ssc); // tests the API, does not actually test data receiving - JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic); - JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic, + JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic); + JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2()); } } diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala index ab6542918b579e20920eb7631cbc83cae2e70f4b..fcc159e85a85bf918f587dc0c626bca6bebe8231 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala @@ -28,8 +28,8 @@ class MQTTStreamSuite extends TestSuiteBase { val topic = "def" // tests the API, does not actually test data receiving - val test1 = ssc.mqttStream(brokerUrl, topic) - val test2 = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) + val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic) + val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2) // TODO: Actually test receiving data } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala deleted file mode 100644 index e91049d9b163cf1ec0eef5c1c71625a9b6a1cfef..0000000000000000000000000000000000000000 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterFunctions.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.twitter - -import twitter4j.Status -import twitter4j.auth.Authorization - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ - -/** - * Extra Twitter input stream functions available on [[org.apache.spark.streaming.StreamingContext]] - * through implicit conversions. Import org.apache.spark.streaming.twitter._ to use these functions. - */ -class TwitterFunctions(ssc: StreamingContext) { - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth - * authorization; this uses the system properties twitter4j.oauth.consumerKey, - * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and - * twitter4j.oauth.accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Option[Authorization], - filters: Seq[String] = Nil, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[Status] = { - val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) - ssc.registerInputStream(inputStream) - inputStream - } -} diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala similarity index 53% rename from external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala rename to external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index 22e297a03af21fe62170695d26e8e742c47f26c9..5e506ffabcfc4a796bc0cf1a10247ad6023d7757 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -15,29 +15,45 @@ * limitations under the License. */ -package org.apache.spark.streaming.api.java.twitter +package org.apache.spark.streaming.twitter import twitter4j.Status import twitter4j.auth.Authorization - import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, DStream} import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} -import org.apache.spark.streaming.twitter._ -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating Twitter input streams. - */ -class TwitterFunctions(javaStreamingContext: JavaStreamingContext) { +object TwitterUtils { + /** + * Create a input stream that returns tweets received from Twitter. + * @param ssc StreamingContext object + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def createStream( + ssc: StreamingContext, + twitterAuth: Option[Authorization], + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = { + val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) + ssc.registerInputStream(inputStream) + inputStream + } /** * Create a input stream that returns tweets received from Twitter using Twitter4J's default * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and * twitter4j.oauth.accessTokenSecret. + * @param jssc JavaStreamingContext object */ - def twitterStream(): JavaDStream[Status] = { - javaStreamingContext.ssc.twitterStream(None) + def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = { + createStream(jssc.ssc, None) } /** @@ -45,10 +61,11 @@ class TwitterFunctions(javaStreamingContext: JavaStreamingContext) { * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and * twitter4j.oauth.accessTokenSecret. + * @param jssc JavaStreamingContext object * @param filters Set of filter strings to get only those tweets that match them */ - def twitterStream(filters: Array[String]): JavaDStream[Status] = { - javaStreamingContext.ssc.twitterStream(None, filters) + def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = { + createStream(jssc.ssc, None, filters) } /** @@ -56,44 +73,54 @@ class TwitterFunctions(javaStreamingContext: JavaStreamingContext) { * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and * twitter4j.oauth.accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them + * @param jssc JavaStreamingContext object + * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ - def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = { - javaStreamingContext.ssc.twitterStream(None, filters, storageLevel) + def createStream( + jssc: JavaStreamingContext, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + createStream(jssc.ssc, None, filters, storageLevel) } /** * Create a input stream that returns tweets received from Twitter. + * @param jssc JavaStreamingContext object * @param twitterAuth Twitter4J Authorization */ - def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = { - javaStreamingContext.ssc.twitterStream(Some(twitterAuth)) + def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = { + createStream(jssc.ssc, Some(twitterAuth)) } /** * Create a input stream that returns tweets received from Twitter. + * @param jssc JavaStreamingContext object * @param twitterAuth Twitter4J Authorization - * @param filters Set of filter strings to get only those tweets that match them + * @param filters Set of filter strings to get only those tweets that match them */ - def twitterStream( + def createStream( + jssc: JavaStreamingContext, twitterAuth: Authorization, filters: Array[String] ): JavaDStream[Status] = { - javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters) + createStream(jssc.ssc, Some(twitterAuth), filters) } /** * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization object - * @param filters Set of filter strings to get only those tweets that match them + * @param jssc JavaStreamingContext object + * @param twitterAuth Twitter4J Authorization object + * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ - def twitterStream( + def createStream( + jssc: JavaStreamingContext, twitterAuth: Authorization, filters: Array[String], storageLevel: StorageLevel ): JavaDStream[Status] = { - javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel) } } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala deleted file mode 100644 index 23f82c58859b84cd8637bf5613ebdc64d9b0dfa1..0000000000000000000000000000000000000000 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -package object twitter { - implicit def sscToTwitterFunctions(ssc: StreamingContext) = new TwitterFunctions(ssc) -} diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java index 4564d6cd334c5b314a33673103793946b6360a88..e46b4e5c7531dac29123589ee6a5a22e0dd6f560 100644 --- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java +++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java @@ -19,13 +19,10 @@ package org.apache.spark.streaming.twitter; import java.util.Arrays; -import org.apache.spark.streaming.api.java.twitter.TwitterFunctions; import org.junit.Test; - import twitter4j.Status; import twitter4j.auth.Authorization; import twitter4j.auth.NullAuthorization; - import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; @@ -33,18 +30,17 @@ import org.apache.spark.streaming.api.java.JavaDStream; public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { @Test public void testTwitterStream() { - TwitterFunctions twitterFunc = new TwitterFunctions(ssc); String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray(); Authorization auth = NullAuthorization.getInstance(); // tests the API, does not actually test data receiving - JavaDStream<Status> test1 = twitterFunc.twitterStream(); - JavaDStream<Status> test2 = twitterFunc.twitterStream(filters); - JavaDStream<Status> test3 = - twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaDStream<Status> test4 = twitterFunc.twitterStream(auth); - JavaDStream<Status> test5 = twitterFunc.twitterStream(auth, filters); - JavaDStream<Status> test6 = - twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<Status> test1 = TwitterUtils.createStream(ssc); + JavaDStream<Status> test2 = TwitterUtils.createStream(ssc, filters); + JavaDStream<Status> test3 = TwitterUtils.createStream( + ssc, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<Status> test4 = TwitterUtils.createStream(ssc, auth); + JavaDStream<Status> test5 = TwitterUtils.createStream(ssc, auth, filters); + JavaDStream<Status> test6 = TwitterUtils.createStream(ssc, + auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); } } diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala index d7f6d35e07463cf3aef07869a92b99daa7ba5512..a0a8fe617b134c7c39ae8df77690f12a1fa42797 100644 --- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -29,12 +29,13 @@ class TwitterStreamSuite extends TestSuiteBase { val authorization: Authorization = NullAuthorization.getInstance() // tests the API, does not actually test data receiving - val test1 = ssc.twitterStream(None) - val test2 = ssc.twitterStream(None, filters) - val test3 = ssc.twitterStream(None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) - val test4 = ssc.twitterStream(Some(authorization)) - val test5 = ssc.twitterStream(Some(authorization), filters) - val test6 = ssc.twitterStream(Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test1 = TwitterUtils.createStream(ssc, None) + val test2 = TwitterUtils.createStream(ssc, None, filters) + val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test4 = TwitterUtils.createStream(ssc, Some(authorization)) + val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters) + val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters, + StorageLevel.MEMORY_AND_DISK_SER_2) // Note that actually testing the data receiving is hard as authentication keys are // necessary for accessing Twitter live stream diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala deleted file mode 100644 index f4c75ab7c9fc51802eb263fea68bfc815a44adb5..0000000000000000000000000000000000000000 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.zeromq - -import scala.reflect.ClassTag - -import akka.actor.{Props, SupervisorStrategy} -import akka.util.ByteString -import akka.zeromq.Subscribe - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ -import org.apache.spark.streaming.receivers._ - -/** - * Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]] - * through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions. - */ -class ZeroMQFunctions(ssc: StreamingContext) { - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic - * and each frame has sequence of byte thus it needs the converter - * (which might be deserializer of bytes) to translate from sequence - * of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - def zeroMQStream[T: ClassTag]( - publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T], - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy - ): DStream[T] = { - ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), - "ZeroMQReceiver", storageLevel, supervisorStrategy) - } -} - diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala similarity index 55% rename from external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala rename to external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index a9bbce71f5ff18273183c1293c4d813a25d2b234..546d9df3b5df9b0a8232a74356dba607151c9ad8 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -15,37 +15,57 @@ * limitations under the License. */ -package org.apache.spark.streaming.api.java.zeromq +package org.apache.spark.streaming.zeromq import scala.reflect.ClassTag import scala.collection.JavaConversions._ - -import akka.actor.SupervisorStrategy +import akka.actor.{Props, SupervisorStrategy} import akka.util.ByteString import akka.zeromq.Subscribe - -import org.apache.spark.storage.StorageLevel import org.apache.spark.api.java.function.{Function => JFunction} -import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} -import org.apache.spark.streaming.zeromq._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy +import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating ZeroMQ input streams. - */ -class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) { +object ZeroMQUtils { + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param ssc StreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic + * and each frame has sequence of byte thus it needs the converter + * (which might be deserializer of bytes) to translate from sequence + * of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + */ + def createStream[T: ClassTag]( + ssc: StreamingContext, + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: Seq[ByteString] ⇒ Iterator[T], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy + ): DStream[T] = { + ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), + "ZeroMQReceiver", storageLevel, supervisorStrategy) + } /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote ZeroMQ publisher - * @param subscribe topic to subscribe to + * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote ZeroMQ publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence * of byte thus it needs the converter(which might be deserializer of bytes) * to translate from sequence of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. * @param storageLevel Storage level to use for storing the received objects */ - def zeroMQStream[T]( + def createStream[T]( + jssc: JavaStreamingContext, publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -54,21 +74,23 @@ class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) { ): JavaDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) + val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) } /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to + * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence * of byte thus it needs the converter(which might be deserializer of bytes) * to translate from sequence of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. + * @param storageLevel RDD storage level. */ - def zeroMQStream[T]( + def createStream[T]( + jssc: JavaStreamingContext, publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -76,27 +98,29 @@ class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) { ): JavaDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) + val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel) } /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to + * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence * of byte thus it needs the converter(which might be deserializer of bytes) * to translate from sequence of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. */ - def zeroMQStream[T]( + def createStream[T]( + jssc: JavaStreamingContext, publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] ): JavaDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn) + val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + createStream[T](jssc.ssc, publisherUrl, subscribe, fn) } } diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala deleted file mode 100644 index dc2717814925b85971385c357cbab5032280d157..0000000000000000000000000000000000000000 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -package object zeromq { - implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc) -} - - diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index b020ae4ceff9373a7b45ad3fc3007140a1da19ea..d2361e14b898a629775d304c1ce3c6e687981cd8 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -17,13 +17,10 @@ package org.apache.spark.streaming.zeromq; -import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions; import org.junit.Test; - import akka.actor.SupervisorStrategy; import akka.util.ByteString; import akka.zeromq.Subscribe; - import org.apache.spark.api.java.function.Function; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; @@ -33,7 +30,6 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { @Test // tests the API, does not actually test data receiving public void testZeroMQStream() { - ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc); String publishUrl = "abc"; Subscribe subscribe = new Subscribe((ByteString)null); Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() { @@ -43,11 +39,12 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { } }; - JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream( - publishUrl, subscribe, bytesToObjects); - JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream( - publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream( - publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); + JavaDStream<String> test1 = ZeroMQUtils.<String>createStream( + ssc, publishUrl, subscribe, bytesToObjects); + JavaDStream<String> test2 = ZeroMQUtils.<String>createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<String> test3 = ZeroMQUtils.<String>createStream( + ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), + SupervisorStrategy.defaultStrategy()); } } diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 5adcdb821f369b06c8480be5fe814e0c42f7c964..4193b8a02f14abefb1b17634e2f03709128588e2 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -33,10 +33,10 @@ class ZeroMQStreamSuite extends TestSuiteBase { val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]] // tests the API, does not actually test data receiving - val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects) - val test2 = ssc.zeroMQStream( - publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) - val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects, + val test1 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) + val test2 = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + val test3 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) // TODO: Actually test data receiving