Skip to content
Snippets Groups Projects
Commit cbf6a5ee authored by unknown's avatar unknown
Browse files

Removed unused code, clarified intent of the program, batch size to 1 second

parent 1d54401d
No related branches found
No related tags found
No related merge requests found
...@@ -4,7 +4,7 @@ import spark.streaming._ ...@@ -4,7 +4,7 @@ import spark.streaming._
import spark.streaming.StreamingContext._ import spark.streaming.StreamingContext._
/** /**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every second.
* Usage: StatefulNetworkWordCount <master> <hostname> <port> * Usage: StatefulNetworkWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
...@@ -15,8 +15,6 @@ import spark.streaming.StreamingContext._ ...@@ -15,8 +15,6 @@ import spark.streaming.StreamingContext._
* `$ ./run spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999` * `$ ./run spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
*/ */
object StatefulNetworkWordCount { object StatefulNetworkWordCount {
private def className[A](a: A)(implicit m: Manifest[A]) = m.toString
def main(args: Array[String]) { def main(args: Array[String]) {
if (args.length < 3) { if (args.length < 3) {
System.err.println("Usage: StatefulNetworkWordCount <master> <hostname> <port>\n" + System.err.println("Usage: StatefulNetworkWordCount <master> <hostname> <port>\n" +
...@@ -32,8 +30,8 @@ object StatefulNetworkWordCount { ...@@ -32,8 +30,8 @@ object StatefulNetworkWordCount {
Some(currentCount + previousCount) Some(currentCount + previousCount)
} }
// Create the context with a 10 second batch size // Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(10), val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(1),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
ssc.checkpoint(".") ssc.checkpoint(".")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment