From 12ea14c211da908a278ab19fd1e9f6acd45daae8 Mon Sep 17 00:00:00 2001 From: Tathagata Das <tathagata.das1565@gmail.com> Date: Mon, 18 Feb 2013 15:18:34 -0800 Subject: [PATCH] Changed networkStream to socketStream and pluggableNetworkStream to become networkStream as a way to create streams from arbitrary network receiver. --- docs/streaming-programming-guide.md | 10 +++---- .../examples/JavaNetworkWordCount.java | 2 +- .../examples/AkkaActorWordCount.scala | 12 ++++----- .../streaming/examples/NetworkWordCount.scala | 2 +- .../examples/clickstream/PageViewStream.scala | 2 +- .../spark/streaming/StreamingContext.scala | 26 ++++++++++--------- .../api/java/JavaStreamingContext.scala | 10 +++---- .../java/spark/streaming/JavaAPISuite.java | 5 ++-- .../spark/streaming/InputStreamsSuite.scala | 2 +- 9 files changed, 36 insertions(+), 35 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index d408e80359..71e1bd4aab 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -34,16 +34,16 @@ The StreamingContext is used to creating InputDStreams from input sources: {% highlight scala %} // Assuming ssc is the StreamingContext -ssc.networkStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port -ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory +ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory +ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port {% endhighlight %} -A complete list of input sources is available in the [StreamingContext API documentation](api/streaming/index.html#spark.streaming.StreamingContext). Data received from these sources can be processed using DStream operations, which are explained next. +We also provide a input streams for Kafka, Flume, Akka actor, etc. For a complete list of input streams, take a look at the [StreamingContext API documentation](api/streaming/index.html#spark.streaming.StreamingContext). # DStream Operations -Once an input DStream has been created, you can transform it using _DStream operators_. Most of these operators return new DStreams which you can further transform. Eventually, you'll need to call an _output operator_, which forces evaluation of the DStream by writing data out to an external source. +Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, you'll need to call the output operations, which writies data out to an external source. ## Transformations @@ -452,4 +452,4 @@ If the driver had crashed in the middle of the processing of time 3, then it wil # Where to Go from Here * Documentation - [Scala and Java](api/streaming/index.html) -* More examples - [Scala](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples) and [Java](https://github.com/mesos/spark/tree/master/examples/src/main/java/spark/streaming/examples) \ No newline at end of file +* More examples - [Scala](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples) and [Java](https://github.com/mesos/spark/tree/master/examples/src/main/java/spark/streaming/examples) diff --git a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java index 4299febfd6..07342beb02 100644 --- a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java @@ -35,7 +35,7 @@ public class JavaNetworkWordCount { // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') - JavaDStream<String> lines = ssc.networkTextStream(args[1], Integer.parseInt(args[2])); + JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2])); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { diff --git a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala index ff05842c71..553afc2024 100644 --- a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala @@ -36,8 +36,8 @@ class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String) } /** - * A sample word count program demonstrating the use of plugging in - * AkkaActor as Receiver + * A sample word count program demonstrating the use of Akka actor stream. + * */ object AkkaActorWordCount { def main(args: Array[String]) { @@ -56,18 +56,18 @@ object AkkaActorWordCount { Seconds(batchDuration.toLong)) /* - * Following is the use of pluggableActorStream to plug in custom actor as receiver + * Following is the use of actorStream to plug in custom actor as receiver * * An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e type of data received and PluggableInputDstream + * to ensure the type safety, i.e type of data received and actorStream * should be same. * - * For example: Both pluggableActorStream and SampleActorReceiver are parameterized + * For example: Both actorStream and SampleActorReceiver are parameterized * to same type to ensure type safety. */ - val lines = ssc.pluggableActorStream[String]( + val lines = ssc.actorStream[String]( Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format( remoteAkkaHost, remoteAkkaPort.toInt))), "SampleReceiver") diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala index 32f7d57bea..7ff70ae2e5 100644 --- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala @@ -27,7 +27,7 @@ object NetworkWordCount { // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') - val lines = ssc.networkTextStream(args(1), args(2).toInt) + val lines = ssc.socketTextStream(args(1), args(2).toInt) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index 60f228b8ad..fba72519a9 100644 --- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -27,7 +27,7 @@ object PageViewStream { val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1)) // Create a NetworkInputDStream on target host:port and convert each line to a PageView - val pageViews = ssc.networkTextStream(host, port) + val pageViews = ssc.socketTextStream(host, port) .flatMap(_.split("\n")) .map(PageView.fromString(_)) diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 48d344f055..a426649726 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -146,7 +146,7 @@ class StreamingContext private ( * Create an input stream with any arbitrary user implemented network receiver. * @param receiver Custom implementation of NetworkReceiver */ - def pluggableNetworkStream[T: ClassManifest]( + def networkStream[T: ClassManifest]( receiver: NetworkReceiver[T]): DStream[T] = { val inputStream = new PluggableInputDStream[T](this, receiver) @@ -155,15 +155,16 @@ class StreamingContext private ( } /** - * Create an input stream with any arbitrary user implemented akka actor receiver. + * Create an input stream with any arbitrary user implemented Akka actor receiver. * @param props Props object defining creation of the actor * @param name Name of the actor - * @param storageLevel RDD storage level. Defaults to memory-only. + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ - def pluggableActorStream[T: ClassManifest]( + def actorStream[T: ClassManifest]( props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = { - pluggableNetworkStream(new ActorReceiver(Settings(props, name, storageLevel))) + networkStream(new ActorReceiver(Settings(props, name, storageLevel))) } /** @@ -174,7 +175,8 @@ class StreamingContext private ( * in its own thread. * @param initialOffsets Optional initial offsets for each of the partitions to consume. * By default the value is pulled from zookeper. - * @param storageLevel RDD storage level. Defaults to memory-only. + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def kafkaStream[T: ClassManifest]( zkQuorum: String, @@ -189,24 +191,24 @@ class StreamingContext private ( } /** - * Create a input stream from network source hostname:port. Data is received using - * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * Create a input stream from TCP source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ - def networkTextStream( + def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[String] = { - networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) + socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } /** - * Create a input stream from network source hostname:port. Data is received using + * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes it interepreted as object using the given * converter. * @param hostname Hostname to connect to for receiving data @@ -215,7 +217,7 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects received (after converting bytes to objects) */ - def networkStream[T: ClassManifest]( + def socketStream[T: ClassManifest]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 03933aae93..d9a676819a 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -130,7 +130,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { */ def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel) : JavaDStream[String] = { - ssc.networkTextStream(hostname, port, storageLevel) + ssc.socketTextStream(hostname, port, storageLevel) } /** @@ -140,8 +140,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data */ - def networkTextStream(hostname: String, port: Int): JavaDStream[String] = { - ssc.networkTextStream(hostname, port) + def socketTextStream(hostname: String, port: Int): JavaDStream[String] = { + ssc.socketTextStream(hostname, port) } /** @@ -154,7 +154,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects received (after converting bytes to objects) */ - def networkStream[T]( + def socketStream[T]( hostname: String, port: Int, converter: JFunction[InputStream, java.lang.Iterable[T]], @@ -163,7 +163,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { def fn = (x: InputStream) => converter.apply(x).toIterator implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - ssc.networkStream(hostname, port, fn, storageLevel) + ssc.socketStream(hostname, port, fn, storageLevel) } /** diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 16bacffb92..5d510fd89f 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -23,7 +23,6 @@ import spark.streaming.JavaCheckpointTestUtils; import spark.streaming.dstream.KafkaPartitionKey; import java.io.*; -import java.text.Collator; import java.util.*; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -984,7 +983,7 @@ public class JavaAPISuite implements Serializable { @Test public void testNetworkTextStream() { - JavaDStream test = ssc.networkTextStream("localhost", 12345); + JavaDStream test = ssc.socketTextStream("localhost", 12345); } @Test @@ -1004,7 +1003,7 @@ public class JavaAPISuite implements Serializable { } } - JavaDStream test = ssc.networkStream( + JavaDStream test = ssc.socketStream( "localhost", 12345, new Converter(), diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 7c1c2e1040..e6aecfbb76 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -41,7 +41,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework, batchDuration) - val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) -- GitLab