Skip to content
Snippets Groups Projects
Commit 15a2ab5f authored by Sean Owen's avatar Sean Owen
Browse files

Revise formatting of previous commit f80e2629

parent f80e2629
No related branches found
No related tags found
No related merge requests found
......@@ -39,7 +39,6 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
* Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
* second starting with initial value of word count.
......@@ -65,17 +64,17 @@ public class JavaStatefulNetworkWordCount {
StreamingExamples.setStreamingLogLevels();
// Update the cumulative count function
final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new
Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = state.or(0);
for (Integer value : values) {
newSum += value;
}
return Optional.of(newSum);
}
};
final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = state.or(0);
for (Integer value : values) {
newSum += value;
}
return Optional.of(newSum);
}
};
// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount");
......@@ -97,12 +96,13 @@ public class JavaStatefulNetworkWordCount {
}
});
JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
// This will give a Dstream made of state (which is the cumulative count of the words)
JavaPairDStream<String, Integer> stateDstream = wordsDstream.updateStateByKey(updateFunction,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment