Skip to content
Snippets Groups Projects
Commit 11d54941 authored by Sean Owen's avatar Sean Owen Committed by Patrick Wendell
Browse files

SPARK-1663. Corrections for several compile errors in streaming code examples,...

SPARK-1663. Corrections for several compile errors in streaming code examples, and updates to follow API changes

I gave the Streaming code examples, both Scala and Java, a test run today. I turned up a number of small errors, mostly compile errors in the Java examples. There were a few typos in the Scala too.

I also took the liberty of adding things like imports, since in several cases they are not obvious. Feel free to push back on some changes.

There's one thing I haven't quite addressed in the changes. `JavaPairDStream` uses the Java API version of `Function2` in almost all cases, as `JFunction2`. However it uses `scala.Function2` in:

```
  def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration)
  :JavaPairDStream[K, V] = {
    dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
  }
```

Is that a typo?

Also, in Scala, I could not get this to compile:
```
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
error: missing parameter type for expanded function ((x$1, x$2) => x$1.$plus(x$2))
```

You can see my fix below but am I missing something?

Otherwise I can say these all worked for me!

Author: Sean Owen <sowen@cloudera.com>

Closes #589 from srowen/SPARK-1663 and squashes the following commits:

65a906b [Sean Owen] Corrections for several compile errors in streaming code examples, and updates to follow API changes
parent 3d0a02df
No related branches found
No related tags found
No related merge requests found
...@@ -76,16 +76,19 @@ Besides Spark's configuration, we specify that any DStream will be processed ...@@ -76,16 +76,19 @@ Besides Spark's configuration, we specify that any DStream will be processed
in 1 second batches. in 1 second batches.
{% highlight scala %} {% highlight scala %}
// Create a StreamingContext with a SparkConf configuration import org.apache.spark.api.java.function._
val ssc = new StreamingContext(sparkConf, Seconds(1)) import org.apache.spark.streaming._
import org.apache.spark.streaming.api._
// Create a StreamingContext with a local master
val ssc = new StreamingContext("local", "NetworkWordCount", Seconds(1))
{% endhighlight %} {% endhighlight %}
Using this context, we then create a new DStream Using this context, we then create a new DStream
by specifying the IP address and port of the data server. by specifying the IP address and port of the data server.
{% highlight scala %} {% highlight scala %}
// Create a DStream that will connect to serverIP:serverPort // Create a DStream that will connect to serverIP:serverPort, like localhost:9999
val lines = ssc.socketTextStream(serverIP, serverPort) val lines = ssc.socketTextStream("localhost", 9999)
{% endhighlight %} {% endhighlight %}
This `lines` DStream represents the stream of data that will be received from the data This `lines` DStream represents the stream of data that will be received from the data
...@@ -103,6 +106,7 @@ each line will be split into multiple words and the stream of words is represent ...@@ -103,6 +106,7 @@ each line will be split into multiple words and the stream of words is represent
`words` DStream. Next, we want to count these words. `words` DStream. Next, we want to count these words.
{% highlight scala %} {% highlight scala %}
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch // Count each word in each batch
val pairs = words.map(word => (word, 1)) val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _) val wordCounts = pairs.reduceByKey(_ + _)
...@@ -138,16 +142,20 @@ functionality. Besides Spark's configuration, we specify that any DStream would ...@@ -138,16 +142,20 @@ functionality. Besides Spark's configuration, we specify that any DStream would
in 1 second batches. in 1 second batches.
{% highlight java %} {% highlight java %}
// Create a StreamingContext with a SparkConf configuration import org.apache.spark.api.java.function.*;
JavaStreamingContext jssc = StreamingContext(sparkConf, new Duration(1000)) import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a StreamingContext with a local master
JavaStreamingContext jssc = new JavaStreamingContext("local", "JavaNetworkWordCount", new Duration(1000))
{% endhighlight %} {% endhighlight %}
Using this context, we then create a new DStream Using this context, we then create a new DStream
by specifying the IP address and port of the data server. by specifying the IP address and port of the data server.
{% highlight java %} {% highlight java %}
// Create a DStream that will connect to serverIP:serverPort // Create a DStream that will connect to serverIP:serverPort, like localhost:9999
JavaDStream<String> lines = jssc.socketTextStream(serverIP, serverPort); JavaDStream<String> lines = jssc.socketTextStream("localhost", 9999);
{% endhighlight %} {% endhighlight %}
This `lines` DStream represents the stream of data that will be received from the data This `lines` DStream represents the stream of data that will be received from the data
...@@ -159,7 +167,7 @@ space into words. ...@@ -159,7 +167,7 @@ space into words.
JavaDStream<String> words = lines.flatMap( JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() { new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String x) { @Override public Iterable<String> call(String x) {
return Lists.newArrayList(x.split(" ")); return Arrays.asList(x.split(" "));
} }
}); });
{% endhighlight %} {% endhighlight %}
...@@ -359,7 +367,7 @@ as explained earlier. Finally, the last two parameters are needed to deploy your ...@@ -359,7 +367,7 @@ as explained earlier. Finally, the last two parameters are needed to deploy your
if running in distributed mode, as described in the if running in distributed mode, as described in the
[Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster). [Spark programming guide](scala-programming-guide.html#deploying-code-on-a-cluster).
Additionally, the underlying SparkContext can be accessed as Additionally, the underlying SparkContext can be accessed as
`streamingContext.sparkContext`. `ssc.sparkContext`.
The batch interval must be set based on the latency requirements of your application The batch interval must be set based on the latency requirements of your application
and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size) and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-size)
...@@ -399,7 +407,7 @@ These operations are discussed in detail in later sections. ...@@ -399,7 +407,7 @@ These operations are discussed in detail in later sections.
## Input Sources ## Input Sources
We have already taken a look at the `streamingContext.socketTextStream(...)` in the [quick We have already taken a look at the `ssc.socketTextStream(...)` in the [quick
example](#a-quick-example) which creates a DStream from text example](#a-quick-example) which creates a DStream from text
data received over a TCP socket connection. Besides sockets, the core Spark Streaming API provides data received over a TCP socket connection. Besides sockets, the core Spark Streaming API provides
methods for creating DStreams from files and Akka actors as input sources. methods for creating DStreams from files and Akka actors as input sources.
...@@ -409,12 +417,12 @@ Specifically, for files, the DStream can be created as ...@@ -409,12 +417,12 @@ Specifically, for files, the DStream can be created as
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala"> <div data-lang="scala">
{% highlight scala %} {% highlight scala %}
streamingContext.fileStream(dataDirectory) ssc.fileStream(dataDirectory)
{% endhighlight %} {% endhighlight %}
</div> </div>
<div data-lang="java"> <div data-lang="java">
{% highlight java %} {% highlight java %}
javaStreamingContext.fileStream(dataDirectory); jssc.fileStream(dataDirectory);
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
...@@ -443,13 +451,13 @@ project dependencies, you can create a DStream from Kafka as ...@@ -443,13 +451,13 @@ project dependencies, you can create a DStream from Kafka as
<div data-lang="scala"> <div data-lang="scala">
{% highlight scala %} {% highlight scala %}
import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.kafka._
KafkaUtils.createStream(streamingContext, kafkaParams, ...) KafkaUtils.createStream(ssc, kafkaParams, ...)
{% endhighlight %} {% endhighlight %}
</div> </div>
<div data-lang="java"> <div data-lang="java">
{% highlight java %} {% highlight java %}
import org.apache.spark.streaming.kafka.* import org.apache.spark.streaming.kafka.*;
KafkaUtils.createStream(javaStreamingContext, kafkaParams, ...); KafkaUtils.createStream(jssc, kafkaParams, ...);
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
...@@ -578,13 +586,14 @@ val runningCounts = pairs.updateStateByKey[Int](updateFunction _) ...@@ -578,13 +586,14 @@ val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
{% highlight java %} {% highlight java %}
import com.google.common.base.Optional;
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = ... // add the new values with the previous running count to get the new count Integer newSum = ... // add the new values with the previous running count to get the new count
return Optional.of(newSum) return Optional.of(newSum);
} }
} };
{% endhighlight %} {% endhighlight %}
This is applied on a DStream containing words (say, the `pairs` DStream containing `(word, This is applied on a DStream containing words (say, the `pairs` DStream containing `(word,
...@@ -617,9 +626,9 @@ spam information (maybe generated with Spark as well) and then filtering based o ...@@ -617,9 +626,9 @@ spam information (maybe generated with Spark as well) and then filtering based o
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = inputDStream.transform(rdd => { val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
... ...
}) })
...@@ -629,13 +638,14 @@ val cleanedDStream = inputDStream.transform(rdd => { ...@@ -629,13 +638,14 @@ val cleanedDStream = inputDStream.transform(rdd => {
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
{% highlight java %} {% highlight java %}
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information // RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = javaSparkContext.hadoopFile(...); final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
JavaPairDStream<String, Integer> cleanedDStream = inputDStream.transform( JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() { new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception { @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
... ...
} }
}); });
...@@ -684,7 +694,7 @@ operation `reduceByKeyAndWindow`. ...@@ -684,7 +694,7 @@ operation `reduceByKeyAndWindow`.
{% highlight scala %} {% highlight scala %}
// Reduce last 30 seconds of data, every 10 seconds // Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
{% endhighlight %} {% endhighlight %}
</div> </div>
...@@ -699,7 +709,7 @@ Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer ...@@ -699,7 +709,7 @@ Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer
}; };
// Reduce last 30 seconds of data, every 10 seconds // Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pair.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000)); JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
{% endhighlight %} {% endhighlight %}
</div> </div>
...@@ -1087,7 +1097,7 @@ This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This i ...@@ -1087,7 +1097,7 @@ This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This i
{% highlight java %} {% highlight java %}
// Create a factory object that can create a and setup a new JavaStreamingContext // Create a factory object that can create a and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
JavaStreamingContextFactory create() { @Override public JavaStreamingContext create() {
JavaStreamingContext jssc = new JavaStreamingContext(...); // new context JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams
... ...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment