From 30cfa8dc1811cfd7b57bd818fb22f408c6b02ff1 Mon Sep 17 00:00:00 2001 From: Prashant Sharma <prashant.s@imaginea.com> Date: Thu, 8 May 2014 10:23:05 -0700 Subject: [PATCH] SPARK-1565, update examples to be used with spark-submit script. Commit for initial feedback, basically I am curious if we should prompt user for providing args esp. when its mandatory. And can we skip if they are not ? Also few other things that did not work like `bin/spark-submit examples/target/scala-2.10/spark-examples-1.0.0-SNAPSHOT-hadoop1.0.4.jar --class org.apache.spark.examples.SparkALS --arg 100 500 10 5 2` Not all the args get passed properly, may be I have messed up something will try to sort it out hopefully. Author: Prashant Sharma <prashant.s@imaginea.com> Closes #552 from ScrapCodes/SPARK-1565/update-examples and squashes the following commits: 669dd23 [Prashant Sharma] Review comments 2727e70 [Prashant Sharma] SPARK-1565, update examples to be used with spark-submit script. (cherry picked from commit 44dd57fb66bb676d753ad8d9757f9f4c03364113) Signed-off-by: Patrick Wendell <pwendell@gmail.com> --- .gitignore | 1 + .../scala/org/apache/spark/SparkContext.scala | 8 ++-- .../org/apache/spark/examples/JavaHdfsLR.java | 13 ++++--- .../apache/spark/examples/JavaLogQuery.java | 13 +++---- .../apache/spark/examples/JavaPageRank.java | 15 +++++--- .../apache/spark/examples/JavaSparkPi.java | 18 ++++----- .../org/apache/spark/examples/JavaTC.java | 24 ++++++------ .../apache/spark/examples/JavaWordCount.java | 12 +++--- .../apache/spark/examples/mllib/JavaALS.java | 22 +++++------ .../spark/examples/mllib/JavaKMeans.java | 22 +++++------ .../apache/spark/examples/mllib/JavaLR.java | 18 ++++----- .../spark/examples/sql/JavaSparkSQL.java | 5 ++- .../streaming/JavaFlumeEventCount.java | 19 ++++------ .../streaming/JavaKafkaWordCount.java | 27 +++++++------- .../streaming/JavaNetworkWordCount.java | 25 ++++++------- .../examples/streaming/JavaQueueStream.java | 22 +++++------ .../apache/spark/examples/BroadcastTest.scala | 22 +++++------ .../spark/examples/CassandraCQLTest.scala | 19 +++++----- .../apache/spark/examples/CassandraTest.scala | 10 ++--- .../examples/ExceptionHandlingTest.scala | 11 ++---- .../apache/spark/examples/GroupByTest.scala | 25 ++++++------- .../org/apache/spark/examples/HBaseTest.scala | 6 +-- .../org/apache/spark/examples/HdfsTest.scala | 4 +- .../org/apache/spark/examples/LogQuery.scala | 14 +++---- .../spark/examples/MultiBroadcastTest.scala | 17 ++++----- .../examples/SimpleSkewedGroupByTest.scala | 24 ++++++------ .../spark/examples/SkewedGroupByTest.scala | 25 ++++++------- .../org/apache/spark/examples/SparkALS.scala | 18 +++------ .../apache/spark/examples/SparkHdfsLR.scala | 13 ++++--- .../apache/spark/examples/SparkKMeans.scala | 18 ++++----- .../org/apache/spark/examples/SparkLR.scala | 11 ++---- .../apache/spark/examples/SparkPageRank.scala | 14 +++---- .../org/apache/spark/examples/SparkPi.scala | 10 ++--- .../org/apache/spark/examples/SparkTC.scala | 12 ++---- .../spark/examples/SparkTachyonHdfsLR.scala | 12 ++---- .../spark/examples/SparkTachyonPi.scala | 10 ++--- .../examples/bagel/WikipediaPageRank.scala | 10 ++--- .../bagel/WikipediaPageRankStandalone.scala | 10 ++--- .../examples/graphx/LiveJournalPageRank.scala | 6 +-- .../spark/examples/sql/RDDRelation.scala | 5 ++- .../examples/sql/hive/HiveFromSpark.scala | 5 ++- .../examples/streaming/ActorWordCount.scala | 21 +++++------ .../examples/streaming/FlumeEventCount.scala | 14 +++---- .../examples/streaming/HdfsWordCount.scala | 18 ++++----- .../examples/streaming/KafkaWordCount.scala | 21 +++++------ .../examples/streaming/MQTTWordCount.scala | 26 ++++++------- .../examples/streaming/NetworkWordCount.scala | 23 +++++------- .../examples/streaming/QueueStream.scala | 10 ++--- .../examples/streaming/RawNetworkGrep.scala | 16 ++++---- .../RecoverableNetworkWordCount.scala | 37 ++++++++++--------- .../streaming/StatefulNetworkWordCount.scala | 21 +++++------ .../streaming/TwitterAlgebirdCMS.scala | 15 +++----- .../streaming/TwitterAlgebirdHLL.scala | 14 +++---- .../streaming/TwitterPopularTags.scala | 13 ++----- .../examples/streaming/ZeroMQWordCount.scala | 23 ++++++------ .../apache/spark/graphx/lib/Analytics.scala | 18 +++++---- 56 files changed, 405 insertions(+), 480 deletions(-) diff --git a/.gitignore b/.gitignore index 32b603f1bc..ad72588b47 100644 --- a/.gitignore +++ b/.gitignore @@ -49,6 +49,7 @@ unit-tests.log /lib/ rat-results.txt scalastyle.txt +conf/*.conf # For Hive metastore_db/ diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index eb14d87467..9d7c2c8d3d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -74,10 +74,10 @@ class SparkContext(config: SparkConf) extends Logging { * be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ - @DeveloperApi - def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { - this(config) - this.preferredNodeLocationData = preferredNodeLocationData + @DeveloperApi + def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { + this(config) + this.preferredNodeLocationData = preferredNodeLocationData } /** diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java index bd96274021..6c177de359 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java @@ -17,6 +17,7 @@ package org.apache.spark.examples; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; @@ -103,16 +104,16 @@ public final class JavaHdfsLR { public static void main(String[] args) { - if (args.length < 3) { - System.err.println("Usage: JavaHdfsLR <master> <file> <iters>"); + if (args.length < 2) { + System.err.println("Usage: JavaHdfsLR <file> <iters>"); System.exit(1); } - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaHdfsLR.class)); - JavaRDD<String> lines = sc.textFile(args[1]); + SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR"); + JavaSparkContext sc = new JavaSparkContext(sparkConf); + JavaRDD<String> lines = sc.textFile(args[0]); JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache(); - int ITERATIONS = Integer.parseInt(args[2]); + int ITERATIONS = Integer.parseInt(args[1]); // Initialize w to a random value double[] w = new double[D]; diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java index 3f7a879538..812e9d5580 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java @@ -20,6 +20,7 @@ package org.apache.spark.examples; import com.google.common.collect.Lists; import scala.Tuple2; import scala.Tuple3; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -34,6 +35,8 @@ import java.util.regex.Pattern; /** * Executes a roll up-style query against Apache logs. + * + * Usage: JavaLogQuery [logFile] */ public final class JavaLogQuery { @@ -97,15 +100,11 @@ public final class JavaLogQuery { } public static void main(String[] args) { - if (args.length == 0) { - System.err.println("Usage: JavaLogQuery <master> [logFile]"); - System.exit(1); - } - JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLogQuery.class)); + SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery"); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); - JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs); + JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs); JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() { @Override diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index e31f676f5f..7ea6df9c17 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -18,9 +18,12 @@ package org.apache.spark.examples; + import scala.Tuple2; import com.google.common.collect.Iterables; + +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -54,20 +57,20 @@ public final class JavaPageRank { } public static void main(String[] args) throws Exception { - if (args.length < 3) { - System.err.println("Usage: JavaPageRank <master> <file> <number_of_iterations>"); + if (args.length < 2) { + System.err.println("Usage: JavaPageRank <file> <number_of_iterations>"); System.exit(1); } - JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaPageRank.class)); + SparkConf sparkConf = new SparkConf().setAppName("JavaPageRank"); + JavaSparkContext ctx = new JavaSparkContext(sparkConf); // Loads in input file. It should be in format of: // URL neighbor URL // URL neighbor URL // URL neighbor URL // ... - JavaRDD<String> lines = ctx.textFile(args[1], 1); + JavaRDD<String> lines = ctx.textFile(args[0], 1); // Loads all URLs from input file and initialize their neighbors. JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(new PairFunction<String, String, String>() { @@ -87,7 +90,7 @@ public final class JavaPageRank { }); // Calculates and updates URL ranks continuously using PageRank algorithm. - for (int current = 0; current < Integer.parseInt(args[2]); current++) { + for (int current = 0; current < Integer.parseInt(args[1]); current++) { // Calculates URL contributions to the rank of other URLs. JavaPairRDD<String, Double> contribs = links.join(ranks).values() .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() { diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java index ac8df02c46..11157d7573 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java @@ -17,6 +17,7 @@ package org.apache.spark.examples; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; @@ -25,19 +26,18 @@ import org.apache.spark.api.java.function.Function2; import java.util.ArrayList; import java.util.List; -/** Computes an approximation to pi */ +/** + * Computes an approximation to pi + * Usage: JavaSparkPi [slices] + */ public final class JavaSparkPi { + public static void main(String[] args) throws Exception { - if (args.length == 0) { - System.err.println("Usage: JavaSparkPi <master> [slices]"); - System.exit(1); - } - - JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaSparkPi", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkPi.class)); + SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi"); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); - int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2; + int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2; int n = 100000 * slices; List<Integer> l = new ArrayList<Integer>(n); for (int i = 0; i < n; i++) { diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java index d66b9ba265..2563fcdd23 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java @@ -17,19 +17,22 @@ package org.apache.spark.examples; -import scala.Tuple2; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; - import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.Set; +import scala.Tuple2; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; + /** * Transitive closure on a graph, implemented in Java. + * Usage: JavaTC [slices] */ public final class JavaTC { @@ -61,14 +64,9 @@ public final class JavaTC { } public static void main(String[] args) { - if (args.length == 0) { - System.err.println("Usage: JavaTC <host> [<slices>]"); - System.exit(1); - } - - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaTC.class)); - Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2; + SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR"); + JavaSparkContext sc = new JavaSparkContext(sparkConf); + Integer slices = (args.length > 0) ? Integer.parseInt(args[0]): 2; JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache(); // Linear transitive closure: each round grows paths by one edge, diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java index 87c1b80981..9a6a944f7e 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java @@ -18,6 +18,7 @@ package org.apache.spark.examples; import scala.Tuple2; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -33,14 +34,15 @@ public final class JavaWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { - if (args.length < 2) { - System.err.println("Usage: JavaWordCount <master> <file>"); + + if (args.length < 1) { + System.err.println("Usage: JavaWordCount <file>"); System.exit(1); } - JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class)); - JavaRDD<String> lines = ctx.textFile(args[1], 1); + SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); + JavaSparkContext ctx = new JavaSparkContext(sparkConf); + JavaRDD<String> lines = ctx.textFile(args[0], 1); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java index 4533c4c5f2..8d381d4e0a 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java @@ -17,6 +17,7 @@ package org.apache.spark.examples.mllib; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; @@ -57,23 +58,22 @@ public final class JavaALS { public static void main(String[] args) { - if (args.length != 5 && args.length != 6) { + if (args.length < 4) { System.err.println( - "Usage: JavaALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]"); + "Usage: JavaALS <ratings_file> <rank> <iterations> <output_dir> [<blocks>]"); System.exit(1); } - - int rank = Integer.parseInt(args[2]); - int iterations = Integer.parseInt(args[3]); - String outputDir = args[4]; + SparkConf sparkConf = new SparkConf().setAppName("JavaALS"); + int rank = Integer.parseInt(args[1]); + int iterations = Integer.parseInt(args[2]); + String outputDir = args[3]; int blocks = -1; - if (args.length == 6) { - blocks = Integer.parseInt(args[5]); + if (args.length == 5) { + blocks = Integer.parseInt(args[4]); } - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class)); - JavaRDD<String> lines = sc.textFile(args[1]); + JavaSparkContext sc = new JavaSparkContext(sparkConf); + JavaRDD<String> lines = sc.textFile(args[0]); JavaRDD<Rating> ratings = lines.map(new ParseRating()); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java index 0cfb8e69ed..f796123a25 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java @@ -19,6 +19,7 @@ package org.apache.spark.examples.mllib; import java.util.regex.Pattern; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; @@ -48,24 +49,21 @@ public final class JavaKMeans { } public static void main(String[] args) { - - if (args.length < 4) { + if (args.length < 3) { System.err.println( - "Usage: JavaKMeans <master> <input_file> <k> <max_iterations> [<runs>]"); + "Usage: JavaKMeans <input_file> <k> <max_iterations> [<runs>]"); System.exit(1); } - - String inputFile = args[1]; - int k = Integer.parseInt(args[2]); - int iterations = Integer.parseInt(args[3]); + String inputFile = args[0]; + int k = Integer.parseInt(args[1]); + int iterations = Integer.parseInt(args[2]); int runs = 1; - if (args.length >= 5) { - runs = Integer.parseInt(args[4]); + if (args.length >= 4) { + runs = Integer.parseInt(args[3]); } - - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class)); + SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans"); + JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<String> lines = sc.textFile(inputFile); JavaRDD<Vector> points = lines.map(new ParsePoint()); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java index f6e48b4987..eceb6927d5 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java @@ -19,6 +19,7 @@ package org.apache.spark.examples.mllib; import java.util.regex.Pattern; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; @@ -51,17 +52,16 @@ public final class JavaLR { } public static void main(String[] args) { - if (args.length != 4) { - System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>"); + if (args.length != 3) { + System.err.println("Usage: JavaLR <input_dir> <step_size> <niters>"); System.exit(1); } - - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class)); - JavaRDD<String> lines = sc.textFile(args[1]); + SparkConf sparkConf = new SparkConf().setAppName("JavaLR"); + JavaSparkContext sc = new JavaSparkContext(sparkConf); + JavaRDD<String> lines = sc.textFile(args[0]); JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache(); - double stepSize = Double.parseDouble(args[2]); - int iterations = Integer.parseInt(args[3]); + double stepSize = Double.parseDouble(args[1]); + int iterations = Integer.parseInt(args[2]); // Another way to configure LogisticRegression // @@ -73,7 +73,7 @@ public final class JavaLR { // LogisticRegressionModel model = lr.train(points.rdd()); LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(), - iterations, stepSize); + iterations, stepSize); System.out.print("Final w: " + model.weights()); diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java index d62a72f534..ad5ec84b71 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java @@ -20,6 +20,7 @@ package org.apache.spark.examples.sql; import java.io.Serializable; import java.util.List; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; @@ -51,8 +52,8 @@ public class JavaSparkSQL { } public static void main(String[] args) throws Exception { - JavaSparkContext ctx = new JavaSparkContext("local", "JavaSparkSQL", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkSQL.class)); + SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL"); + JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaSQLContext sqlCtx = new JavaSQLContext(ctx); // Load a text file and convert each line to a Java Bean. diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java index a5ece68cef..400b68c221 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java @@ -17,6 +17,7 @@ package org.apache.spark.examples.streaming; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; import org.apache.spark.examples.streaming.StreamingExamples; import org.apache.spark.streaming.*; @@ -31,9 +32,8 @@ import org.apache.spark.streaming.flume.SparkFlumeEvent; * an Avro server on at the request host:port address and listen for requests. * Your Flume AvroSink should be pointed to this address. * - * Usage: JavaFlumeEventCount <master> <host> <port> + * Usage: JavaFlumeEventCount <host> <port> * - * <master> is a Spark master URL * <host> is the host the Flume receiver will be started on - a receiver * creates a server and listens for flume events. * <port> is the port the Flume receiver will listen on. @@ -43,22 +43,19 @@ public final class JavaFlumeEventCount { } public static void main(String[] args) { - if (args.length != 3) { - System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>"); + if (args.length != 2) { + System.err.println("Usage: JavaFlumeEventCount <host> <port>"); System.exit(1); } StreamingExamples.setStreamingLogLevels(); - String master = args[0]; - String host = args[1]; - int port = Integer.parseInt(args[2]); + String host = args[0]; + int port = Integer.parseInt(args[1]); Duration batchInterval = new Duration(2000); - - JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, - System.getenv("SPARK_HOME"), - JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class)); + SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); + JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port); flumeStream.count(); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java index da51eb189a..6a74cc50d1 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -21,7 +21,11 @@ import java.util.Map; import java.util.HashMap; import java.util.regex.Pattern; + +import scala.Tuple2; + import com.google.common.collect.Lists; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; @@ -33,19 +37,18 @@ import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; -import scala.Tuple2; /** * Consumes messages from one or more topics in Kafka and does wordcount. - * Usage: JavaKafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads> - * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads> * <zkQuorum> is a list of one or more zookeeper servers that make quorum * <group> is the name of kafka consumer group * <topics> is a list of one or more kafka topics to consume from * <numThreads> is the number of threads the kafka consumer should use * * Example: - * `./bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount local[2] zoo01,zoo02, + * `./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ * zoo03 my-consumer-group topic1,topic2 1` */ @@ -56,27 +59,25 @@ public final class JavaKafkaWordCount { } public static void main(String[] args) { - if (args.length < 5) { - System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>"); + if (args.length < 4) { + System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); System.exit(1); } StreamingExamples.setStreamingLogLevels(); - + SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); // Create the context with a 1 second batch size - JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount", - new Duration(2000), System.getenv("SPARK_HOME"), - JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class)); + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); - int numThreads = Integer.parseInt(args[4]); + int numThreads = Integer.parseInt(args[3]); Map<String, Integer> topicMap = new HashMap<String, Integer>(); - String[] topics = args[3].split(","); + String[] topics = args[2].split(","); for (String topic: topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream<String, String> messages = - KafkaUtils.createStream(jssc, args[1], args[2], topicMap); + KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java index ac84991d87..e5cbd39f43 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java @@ -17,9 +17,10 @@ package org.apache.spark.examples.streaming; -import com.google.common.collect.Lists; -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import scala.Tuple2; +import com.google.common.collect.Lists; + +import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; @@ -27,41 +28,39 @@ import org.apache.spark.examples.streaming.StreamingExamples; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import java.util.regex.Pattern; /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - * Usage: JavaNetworkWordCount <master> <hostname> <port> - * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * Usage: JavaNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./run org.apache.spark.examples.streaming.JavaNetworkWordCount local[2] localhost 9999` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999` */ public final class JavaNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) { - if (args.length < 3) { - System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" + - "In local mode, <master> should be 'local[n]' with n > 1"); + if (args.length < 2) { + System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); System.exit(1); } StreamingExamples.setStreamingLogLevels(); - + SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount"); // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount", - new Duration(1000), System.getenv("SPARK_HOME"), - JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class)); + JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); // Create a JavaReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') - JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2])); + JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1])); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java index 819311968f..4ce8437f82 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java @@ -17,8 +17,16 @@ package org.apache.spark.examples.streaming; -import com.google.common.collect.Lists; + +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + import scala.Tuple2; + +import com.google.common.collect.Lists; + +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; @@ -28,25 +36,17 @@ import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; - public final class JavaQueueStream { private JavaQueueStream() { } public static void main(String[] args) throws Exception { - if (args.length < 1) { - System.err.println("Usage: JavaQueueStream <master>"); - System.exit(1); - } StreamingExamples.setStreamingLogLevels(); + SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream"); // Create the context - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000), - System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class)); + JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); // Create the queue through which RDDs can be pushed to // a QueueInputDStream diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala index f6dfd2c4c6..973049b95a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala @@ -17,28 +17,26 @@ package org.apache.spark.examples -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} +/** + * Usage: BroadcastTest [slices] [numElem] [broadcastAlgo] [blockSize] + */ object BroadcastTest { def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: BroadcastTest <master> [slices] [numElem] [broadcastAlgo]" + - " [blockSize]") - System.exit(1) - } - val bcName = if (args.length > 3) args(3) else "Http" - val blockSize = if (args.length > 4) args(4) else "4096" + val bcName = if (args.length > 2) args(2) else "Http" + val blockSize = if (args.length > 3) args(3) else "4096" System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory") System.setProperty("spark.broadcast.blockSize", blockSize) + val sparkConf = new SparkConf().setAppName("Broadcast Test") - val sc = new SparkContext(args(0), "Broadcast Test", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) + val sc = new SparkContext(sparkConf) - val slices = if (args.length > 1) args(1).toInt else 2 - val num = if (args.length > 2) args(2).toInt else 1000000 + val slices = if (args.length > 0) args(0).toInt else 2 + val num = if (args.length > 1) args(1).toInt else 1000000 val arr1 = new Array[Int](num) for (i <- 0 until arr1.length) { diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index 3798329fc2..9a00701f98 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -30,7 +30,7 @@ import org.apache.cassandra.hadoop.cql3.CqlOutputFormat import org.apache.cassandra.utils.ByteBufferUtil import org.apache.hadoop.mapreduce.Job -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ /* @@ -65,19 +65,18 @@ import org.apache.spark.SparkContext._ /** * This example demonstrates how to read and write to cassandra column family created using CQL3 * using Spark. - * Parameters : <spark_master> <cassandra_node> <cassandra_port> - * Usage: ./bin/run-example org.apache.spark.examples.CassandraCQLTest local[2] localhost 9160 - * + * Parameters : <cassandra_node> <cassandra_port> + * Usage: ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.CassandraCQLTest localhost 9160 */ object CassandraCQLTest { def main(args: Array[String]) { - val sc = new SparkContext(args(0), - "CQLTestApp", - System.getenv("SPARK_HOME"), - SparkContext.jarOfClass(this.getClass).toSeq) - val cHost: String = args(1) - val cPort: String = args(2) + val sparkConf = new SparkConf().setAppName("CQLTestApp") + + val sc = new SparkContext(sparkConf) + val cHost: String = args(0) + val cPort: String = args(1) val KeySpace = "retail" val InputColumnFamily = "ordercf" val OutputColumnFamily = "salecount" diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala index ed5d2f9e46..91ba364a34 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala @@ -30,7 +30,7 @@ import org.apache.cassandra.thrift._ import org.apache.cassandra.utils.ByteBufferUtil import org.apache.hadoop.mapreduce.Job -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ /* @@ -38,10 +38,10 @@ import org.apache.spark.SparkContext._ * support for Hadoop. * * To run this example, run this file with the following command params - - * <spark_master> <cassandra_node> <cassandra_port> + * <cassandra_node> <cassandra_port> * * So if you want to run this on localhost this will be, - * local[3] localhost 9160 + * localhost 9160 * * The example makes some assumptions: * 1. You have already created a keyspace called casDemo and it has a column family named Words @@ -54,9 +54,9 @@ import org.apache.spark.SparkContext._ object CassandraTest { def main(args: Array[String]) { - + val sparkConf = new SparkConf().setAppName("casDemo") // Get a SparkContext - val sc = new SparkContext(args(0), "casDemo") + val sc = new SparkContext(sparkConf) // Build the job configuration with ConfigHelper provided by Cassandra val job = new Job() diff --git a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala index f0dcef431b..d42f63e870 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala @@ -17,17 +17,12 @@ package org.apache.spark.examples -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} object ExceptionHandlingTest { def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: ExceptionHandlingTest <master>") - System.exit(1) - } - - val sc = new SparkContext(args(0), "ExceptionHandlingTest", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) + val sparkConf = new SparkConf().setAppName("ExceptionHandlingTest") + val sc = new SparkContext(sparkConf) sc.parallelize(0 until sc.defaultParallelism).foreach { i => if (math.random > 0.75) { throw new Exception("Testing exception handling") diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala index e67bb29a49..efd91bb054 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala @@ -19,24 +19,21 @@ package org.apache.spark.examples import java.util.Random -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ +/** + * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers] + */ object GroupByTest { def main(args: Array[String]) { - if (args.length == 0) { - System.err.println( - "Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]") - System.exit(1) - } - - var numMappers = if (args.length > 1) args(1).toInt else 2 - var numKVPairs = if (args.length > 2) args(2).toInt else 1000 - var valSize = if (args.length > 3) args(3).toInt else 1000 - var numReducers = if (args.length > 4) args(4).toInt else numMappers - - val sc = new SparkContext(args(0), "GroupBy Test", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) + val sparkConf = new SparkConf().setAppName("GroupBy Test") + var numMappers = if (args.length > 0) args(0).toInt else 2 + var numKVPairs = if (args.length > 1) args(1).toInt else 1000 + var valSize = if (args.length > 2) args(2).toInt else 1000 + var numReducers = if (args.length > 3) args(3).toInt else numMappers + + val sc = new SparkContext(sparkConf) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala index adbd1c02fa..a8c338480e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala @@ -26,11 +26,9 @@ import org.apache.spark.rdd.NewHadoopRDD object HBaseTest { def main(args: Array[String]) { - val sc = new SparkContext(args(0), "HBaseTest", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) - + val sparkConf = new SparkConf().setAppName("HBaseTest") + val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() - // Other options for configuring scan behavior are available. More information available at // http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html conf.set(TableInputFormat.INPUT_TABLE, args(1)) diff --git a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala index c7a4884af1..331de3ad1e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala @@ -21,8 +21,8 @@ import org.apache.spark._ object HdfsTest { def main(args: Array[String]) { - val sc = new SparkContext(args(0), "HdfsTest", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) + val sparkConf = new SparkConf().setAppName("HdfsTest") + val sc = new SparkContext(sparkConf) val file = sc.textFile(args(1)) val mapped = file.map(s => s.length).cache() for (iter <- 1 to 10) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala index f77a444ff7..4c655b84fd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala @@ -17,11 +17,13 @@ package org.apache.spark.examples -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ /** * Executes a roll up-style query against Apache logs. + * + * Usage: LogQuery [logFile] */ object LogQuery { val exampleApacheLogs = List( @@ -40,16 +42,12 @@ object LogQuery { ) def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: LogQuery <master> [logFile]") - System.exit(1) - } - val sc = new SparkContext(args(0), "Log Query", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) + val sparkConf = new SparkConf().setAppName("Log Query") + val sc = new SparkContext(sparkConf) val dataSet = - if (args.length == 2) sc.textFile(args(1)) else sc.parallelize(exampleApacheLogs) + if (args.length == 1) sc.textFile(args(0)) else sc.parallelize(exampleApacheLogs) // scalastyle:off val apacheLogRegex = """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala index c8985eae33..2a5c0c0def 100644 --- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala @@ -18,20 +18,19 @@ package org.apache.spark.examples import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} +/** + * Usage: MultiBroadcastTest [slices] [numElem] + */ object MultiBroadcastTest { def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: MultiBroadcastTest <master> [<slices>] [numElem]") - System.exit(1) - } - val sc = new SparkContext(args(0), "Multi-Broadcast Test", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) + val sparkConf = new SparkConf().setAppName("Multi-Broadcast Test") + val sc = new SparkContext(sparkConf) - val slices = if (args.length > 1) args(1).toInt else 2 - val num = if (args.length > 2) args(2).toInt else 1000000 + val slices = if (args.length > 0) args(0).toInt else 2 + val num = if (args.length > 1) args(1).toInt else 1000000 val arr1 = new Array[Int](num) for (i <- 0 until arr1.length) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala index 54e8503711..5291ab81f4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala @@ -19,25 +19,23 @@ package org.apache.spark.examples import java.util.Random -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ +/** + * Usage: SimpleSkewedGroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] [ratio] + */ object SimpleSkewedGroupByTest { def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: SimpleSkewedGroupByTest <master> " + - "[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]") - System.exit(1) - } - var numMappers = if (args.length > 1) args(1).toInt else 2 - var numKVPairs = if (args.length > 2) args(2).toInt else 1000 - var valSize = if (args.length > 3) args(3).toInt else 1000 - var numReducers = if (args.length > 4) args(4).toInt else numMappers - var ratio = if (args.length > 5) args(5).toInt else 5.0 + val sparkConf = new SparkConf().setAppName("SimpleSkewedGroupByTest") + var numMappers = if (args.length > 0) args(0).toInt else 2 + var numKVPairs = if (args.length > 1) args(1).toInt else 1000 + var valSize = if (args.length > 2) args(2).toInt else 1000 + var numReducers = if (args.length > 3) args(3).toInt else numMappers + var ratio = if (args.length > 4) args(4).toInt else 5.0 - val sc = new SparkContext(args(0), "GroupBy Test", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) + val sc = new SparkContext(sparkConf) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala index 1c5f22e1c0..017d4e1e5c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala @@ -19,24 +19,21 @@ package org.apache.spark.examples import java.util.Random -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ +/** + * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers] + */ object SkewedGroupByTest { def main(args: Array[String]) { - if (args.length == 0) { - System.err.println( - "Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]") - System.exit(1) - } - - var numMappers = if (args.length > 1) args(1).toInt else 2 - var numKVPairs = if (args.length > 2) args(2).toInt else 1000 - var valSize = if (args.length > 3) args(3).toInt else 1000 - var numReducers = if (args.length > 4) args(4).toInt else numMappers - - val sc = new SparkContext(args(0), "GroupBy Test", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) + val sparkConf = new SparkConf().setAppName("GroupBy Test") + var numMappers = if (args.length > 0) args(0).toInt else 2 + var numKVPairs = if (args.length > 1) args(1).toInt else 1000 + var valSize = if (args.length > 2) args(2).toInt else 1000 + var numReducers = if (args.length > 3) args(3).toInt else numMappers + + val sc = new SparkContext(sparkConf) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala index 0dc726aecd..5cbc966bf0 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala @@ -88,32 +88,24 @@ object SparkALS { } def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: SparkALS <master> [<M> <U> <F> <iters> <slices>]") - System.exit(1) - } - - var host = "" var slices = 0 - val options = (0 to 5).map(i => if (i < args.length) Some(args(i)) else None) + val options = (0 to 4).map(i => if (i < args.length) Some(args(i)) else None) options.toArray match { - case Array(host_, m, u, f, iters, slices_) => - host = host_.get + case Array(m, u, f, iters, slices_) => M = m.getOrElse("100").toInt U = u.getOrElse("500").toInt F = f.getOrElse("10").toInt ITERATIONS = iters.getOrElse("5").toInt slices = slices_.getOrElse("2").toInt case _ => - System.err.println("Usage: SparkALS <master> [<M> <U> <F> <iters> <slices>]") + System.err.println("Usage: SparkALS [M] [U] [F] [iters] [slices]") System.exit(1) } printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS) - - val sc = new SparkContext(host, "SparkALS", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) + val sparkConf = new SparkConf().setAppName("SparkALS") + val sc = new SparkContext(sparkConf) val R = generateR() diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 3a6f18c33e..4906a696e9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -49,20 +49,21 @@ object SparkHdfsLR { } def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: SparkHdfsLR <master> <file> <iters>") + if (args.length < 2) { + System.err.println("Usage: SparkHdfsLR <file> <iters>") System.exit(1) } - val inputPath = args(1) + + val sparkConf = new SparkConf().setAppName("SparkHdfsLR") + val inputPath = args(0) val conf = SparkHadoopUtil.get.newConfiguration() - val sc = new SparkContext(args(0), "SparkHdfsLR", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq, Map(), + val sc = new SparkContext(sparkConf, InputFormatInfo.computePreferredLocations( Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) )) val lines = sc.textFile(inputPath) val points = lines.map(parsePoint _).cache() - val ITERATIONS = args(2).toInt + val ITERATIONS = args(1).toInt // Initialize w to a random value var w = DenseVector.fill(D){2 * rand.nextDouble - 1} diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index dcae9591b0..4d28e0aad6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -21,7 +21,7 @@ import java.util.Random import breeze.linalg.{Vector, DenseVector, squaredDistance} -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ /** @@ -52,16 +52,16 @@ object SparkKMeans { } def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: SparkLocalKMeans <master> <file> <k> <convergeDist>") - System.exit(1) + if (args.length < 3) { + System.err.println("Usage: SparkKMeans <file> <k> <convergeDist>") + System.exit(1) } - val sc = new SparkContext(args(0), "SparkLocalKMeans", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) - val lines = sc.textFile(args(1)) + val sparkConf = new SparkConf().setAppName("SparkKMeans") + val sc = new SparkContext(sparkConf) + val lines = sc.textFile(args(0)) val data = lines.map(parseVector _).cache() - val K = args(2).toInt - val convergeDist = args(3).toDouble + val K = args(1).toInt + val convergeDist = args(2).toDouble val kPoints = data.takeSample(withReplacement = false, K, 42).toArray var tempDist = 1.0 diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala index 4f74882ccb..99ceb3089e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala @@ -27,6 +27,7 @@ import org.apache.spark._ /** * Logistic regression based classification. + * Usage: SparkLR [slices] */ object SparkLR { val N = 10000 // Number of data points @@ -47,13 +48,9 @@ object SparkLR { } def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: SparkLR <master> [<slices>]") - System.exit(1) - } - val sc = new SparkContext(args(0), "SparkLR", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) - val numSlices = if (args.length > 1) args(1).toInt else 2 + val sparkConf = new SparkConf().setAppName("SparkLR") + val sc = new SparkContext(sparkConf) + val numSlices = if (args.length > 0) args(0).toInt else 2 val points = sc.parallelize(generateData, numSlices).cache() // Initialize w to a random value diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala index fa41c5c560..40b36c779a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala @@ -18,7 +18,7 @@ package org.apache.spark.examples import org.apache.spark.SparkContext._ -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} /** * Computes the PageRank of URLs from an input file. Input file should @@ -31,14 +31,10 @@ import org.apache.spark.SparkContext */ object SparkPageRank { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: PageRank <master> <file> <number_of_iterations>") - System.exit(1) - } - var iters = args(2).toInt - val ctx = new SparkContext(args(0), "PageRank", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) - val lines = ctx.textFile(args(1), 1) + val sparkConf = new SparkConf().setAppName("PageRank") + var iters = args(1).toInt + val ctx = new SparkContext(sparkConf) + val lines = ctx.textFile(args(0), 1) val links = lines.map{ s => val parts = s.split("\\s+") (parts(0), parts(1)) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala index d8f5720504..9fbb0a800d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala @@ -24,13 +24,9 @@ import org.apache.spark._ /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: SparkPi <master> [<slices>]") - System.exit(1) - } - val spark = new SparkContext(args(0), "SparkPi", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) - val slices = if (args.length > 1) args(1).toInt else 2 + val conf = new SparkConf().setAppName("Spark Pi") + val spark = new SparkContext(conf) + val slices = if (args.length > 0) args(0).toInt else 2 val n = 100000 * slices val count = spark.parallelize(1 to n, slices).map { i => val x = random * 2 - 1 diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala index 17d983cd87..f7f83086df 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala @@ -20,7 +20,7 @@ package org.apache.spark.examples import scala.util.Random import scala.collection.mutable -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ /** @@ -42,13 +42,9 @@ object SparkTC { } def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: SparkTC <master> [<slices>]") - System.exit(1) - } - val spark = new SparkContext(args(0), "SparkTC", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) - val slices = if (args.length > 1) args(1).toInt else 2 + val sparkConf = new SparkConf().setAppName("SparkTC") + val spark = new SparkContext(sparkConf) + val slices = if (args.length > 0) args(0).toInt else 2 var tc = spark.parallelize(generateGraph, slices).cache() // Linear transitive closure: each round grows paths by one edge, diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala index 7e43c384bd..2212762186 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala @@ -51,20 +51,16 @@ object SparkTachyonHdfsLR { } def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: SparkTachyonHdfsLR <master> <file> <iters>") - System.exit(1) - } - val inputPath = args(1) + val inputPath = args(0) val conf = SparkHadoopUtil.get.newConfiguration() - val sc = new SparkContext(args(0), "SparkTachyonHdfsLR", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq, Map(), + val sparkConf = new SparkConf().setAppName("SparkTachyonHdfsLR") + val sc = new SparkContext(sparkConf, InputFormatInfo.computePreferredLocations( Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) )) val lines = sc.textFile(inputPath) val points = lines.map(parsePoint _).persist(StorageLevel.OFF_HEAP) - val ITERATIONS = args(2).toInt + val ITERATIONS = args(1).toInt // Initialize w to a random value var w = DenseVector.fill(D){2 * rand.nextDouble - 1} diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala index 93459110e4..7743f7968b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala @@ -28,14 +28,10 @@ import org.apache.spark.storage.StorageLevel */ object SparkTachyonPi { def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: SparkTachyonPi <master> [<slices>]") - System.exit(1) - } - val spark = new SparkContext(args(0), "SparkTachyonPi", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) + val sparkConf = new SparkConf().setAppName("SparkTachyonPi") + val spark = new SparkContext(sparkConf) - val slices = if (args.length > 1) args(1).toInt else 2 + val slices = if (args.length > 0) args(0).toInt else 2 val n = 100000 * slices val rdd = spark.parallelize(1 to n, slices) diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala index 25bd55ca88..235c3bf820 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala @@ -32,22 +32,22 @@ import scala.xml.{XML,NodeSeq} */ object WikipediaPageRank { def main(args: Array[String]) { - if (args.length < 5) { + if (args.length < 4) { System.err.println( - "Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>") + "Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <usePartitioner>") System.exit(-1) } val sparkConf = new SparkConf() + sparkConf.setAppName("WikipediaPageRank") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.kryo.registrator", classOf[PRKryoRegistrator].getName) val inputFile = args(0) val threshold = args(1).toDouble val numPartitions = args(2).toInt - val host = args(3) - val usePartitioner = args(4).toBoolean + val usePartitioner = args(3).toBoolean - sparkConf.setMaster(host).setAppName("WikipediaPageRank") + sparkConf.setAppName("WikipediaPageRank") val sc = new SparkContext(sparkConf) // Parse the Wikipedia page data into a graph diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala index dee3cb6c0a..a197dac87d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala @@ -30,22 +30,20 @@ import org.apache.spark.rdd.RDD object WikipediaPageRankStandalone { def main(args: Array[String]) { - if (args.length < 5) { + if (args.length < 4) { System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> " + - "<numIterations> <host> <usePartitioner>") + "<numIterations> <usePartitioner>") System.exit(-1) } val sparkConf = new SparkConf() sparkConf.set("spark.serializer", "spark.bagel.examples.WPRSerializer") - val inputFile = args(0) val threshold = args(1).toDouble val numIterations = args(2).toInt - val host = args(3) - val usePartitioner = args(4).toBoolean + val usePartitioner = args(3).toBoolean - sparkConf.setMaster(host).setAppName("WikipediaPageRankStandalone") + sparkConf.setAppName("WikipediaPageRankStandalone") val sc = new SparkContext(sparkConf) diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala index d58fddff2b..6ef3b62dcb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala @@ -28,9 +28,9 @@ import org.apache.spark.graphx.lib.Analytics */ object LiveJournalPageRank { def main(args: Array[String]) { - if (args.length < 2) { + if (args.length < 1) { System.err.println( - "Usage: LiveJournalPageRank <master> <edge_list_file>\n" + + "Usage: LiveJournalPageRank <edge_list_file>\n" + " [--tol=<tolerance>]\n" + " The tolerance allowed at convergence (smaller => more accurate). Default is " + "0.001.\n" + @@ -44,6 +44,6 @@ object LiveJournalPageRank { System.exit(-1) } - Analytics.main(args.patch(1, List("pagerank"), 0)) + Analytics.main(args.patch(0, List("pagerank"), 0)) } } diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala index ff9254b044..61c460c6b1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala @@ -17,7 +17,7 @@ package org.apache.spark.examples.sql -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext // One method for defining the schema of an RDD is to make a case class with the desired column @@ -26,7 +26,8 @@ case class Record(key: Int, value: String) object RDDRelation { def main(args: Array[String]) { - val sc = new SparkContext("local", "RDDRelation") + val sparkConf = new SparkConf().setAppName("RDDRelation") + val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) // Importing the SQL context gives access to all the SQL functions and implicit conversions. diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index 66ce93a26e..b262fabbe0 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -17,7 +17,7 @@ package org.apache.spark.examples.sql.hive -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import org.apache.spark.sql.hive.LocalHiveContext @@ -25,7 +25,8 @@ object HiveFromSpark { case class Record(key: Int, value: String) def main(args: Array[String]) { - val sc = new SparkContext("local", "HiveFromSpark") + val sparkConf = new SparkConf().setAppName("HiveFromSpark") + val sc = new SparkContext(sparkConf) // A local hive context creates an instance of the Hive Metastore in process, storing the // the warehouse data in the current directory. This location can be overridden by diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index 84cf43df0f..e29e16a9c1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -126,31 +126,30 @@ object FeederActor { /** * A sample word count program demonstrating the use of plugging in * Actor as Receiver - * Usage: ActorWordCount <master> <hostname> <port> - * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * Usage: ActorWordCount <hostname> <port> * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on. * * To run this example locally, you may run Feeder Actor as - * `$ ./bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999` + * `./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999` * and then run the example - * `./bin/run-example org.apache.spark.examples.streaming.ActorWordCount local[2] 127.0.1.1 9999` + * `./bin/spark-submit examples.jar --class org.apache.spark.examples.streaming.ActorWordCount \ + * 127.0.1.1 9999` */ object ActorWordCount { def main(args: Array[String]) { - if (args.length < 3) { + if (args.length < 2) { System.err.println( - "Usage: ActorWordCount <master> <hostname> <port>" + - "In local mode, <master> should be 'local[n]' with n > 1") + "Usage: ActorWordCount <hostname> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() - val Seq(master, host, port) = args.toSeq - + val Seq(host, port) = args.toSeq + val sparkConf = new SparkConf().setAppName("ActorWordCount") // Create the context and set the batch size - val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Seconds(2)) /* * Following is the use of actorStream to plug in custom actor as receiver diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala index 5b2a1035fc..38362edac2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala @@ -17,6 +17,7 @@ package org.apache.spark.examples.streaming +import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ @@ -29,9 +30,8 @@ import org.apache.spark.util.IntParam * an Avro server on at the request host:port address and listen for requests. * Your Flume AvroSink should be pointed to this address. * - * Usage: FlumeEventCount <master> <host> <port> + * Usage: FlumeEventCount <host> <port> * - * <master> is a Spark master URL * <host> is the host the Flume receiver will be started on - a receiver * creates a server and listens for flume events. * <port> is the port the Flume receiver will listen on. @@ -40,21 +40,21 @@ object FlumeEventCount { def main(args: Array[String]) { if (args.length != 3) { System.err.println( - "Usage: FlumeEventCount <master> <host> <port>") + "Usage: FlumeEventCount <host> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() - val Array(master, host, IntParam(port)) = args + val Array(host, IntParam(port)) = args val batchInterval = Milliseconds(2000) + val sparkConf = new SparkConf().setAppName("FlumeEventCount") // Create the context and set the batch size - val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval, - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream - val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2) + val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala index b440956ba3..55ac48cfb6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala @@ -17,35 +17,35 @@ package org.apache.spark.examples.streaming +import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ /** * Counts words in new text files created in the given directory - * Usage: HdfsWordCount <master> <directory> - * <master> is the Spark master URL. + * Usage: HdfsWordCount <directory> * <directory> is the directory that Spark Streaming will use to find and read new text files. * * To run this on your local machine on directory `localdir`, run this example - * `$ ./bin/run-example org.apache.spark.examples.streaming.HdfsWordCount local[2] localdir` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.HdfsWordCount localdir` * Then create a text file in `localdir` and the words in the file will get counted. */ object HdfsWordCount { def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: HdfsWordCount <master> <directory>") + if (args.length < 1) { + System.err.println("Usage: HdfsWordCount <directory>") System.exit(1) } StreamingExamples.setStreamingLogLevels() - + val sparkConf = new SparkConf().setAppName("HdfsWordCount") // Create the context - val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Seconds(2)) // Create the FileInputDStream on the directory and use the // stream to count words in new files created - val lines = ssc.textFileStream(args(1)) + val lines = ssc.textFileStream(args(0)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala index c3aae5af05..3af806981f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala @@ -24,34 +24,33 @@ import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ +import org.apache.spark.SparkConf -// scalastyle:off /** * Consumes messages from one or more topics in Kafka and does wordcount. - * Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads> - * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads> * <zkQuorum> is a list of one or more zookeeper servers that make quorum * <group> is the name of kafka consumer group * <topics> is a list of one or more kafka topics to consume from * <numThreads> is the number of threads the kafka consumer should use * * Example: - * `./bin/run-example org.apache.spark.examples.streaming.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1` + * `./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.KafkaWordCount local[2] zoo01,zoo02,zoo03 \ + * my-consumer-group topic1,topic2 1` */ -// scalastyle:on object KafkaWordCount { def main(args: Array[String]) { - if (args.length < 5) { - System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>") + if (args.length < 4) { + System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } StreamingExamples.setStreamingLogLevels() - val Array(master, zkQuorum, group, topics, numThreads) = args - - val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val Array(zkQuorum, group, topics, numThreads) = args + val sparkConf = new SparkConf().setAppName("KafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index 47bf1e5a06..3a10daa9ab 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -24,6 +24,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.mqtt._ +import org.apache.spark.SparkConf /** * A simple Mqtt publisher for demonstration purposes, repeatedly publishes @@ -64,7 +65,6 @@ object MQTTPublisher { } } -// scalastyle:off /** * A sample wordcount with MqttStream stream * @@ -74,30 +74,28 @@ object MQTTPublisher { * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/ * Example Java code for Mqtt Publisher and Subscriber can be found here * https://bitbucket.org/mkjinesh/mqttclient - * Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic> - * In local mode, <master> should be 'local[n]' with n > 1 - * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running. + * Usage: MQTTWordCount <MqttbrokerUrl> <topic> +\ * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running. * * To run this example locally, you may run publisher as - * `$ ./bin/run-example org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` * and run the example as - * `$ ./bin/run-example org.apache.spark.examples.streaming.MQTTWordCount local[2] tcp://localhost:1883 foo` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo` */ -// scalastyle:on object MQTTWordCount { def main(args: Array[String]) { - if (args.length < 3) { + if (args.length < 2) { System.err.println( - "Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>" + - " In local mode, <master> should be 'local[n]' with n > 1") + "Usage: MQTTWordCount <MqttbrokerUrl> <topic>") System.exit(1) } - val Seq(master, brokerUrl, topic) = args.toSeq - - val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), - StreamingContext.jarOfClass(this.getClass).toSeq) + val Seq(brokerUrl, topic) = args.toSeq + val sparkConf = new SparkConf().setAppName("MQTTWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) val words = lines.flatMap(x => x.toString.split(" ")) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala index acfe9a4da3..ad7a199b2c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala @@ -17,41 +17,38 @@ package org.apache.spark.examples.streaming +import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel -// scalastyle:off /** * Counts words in text encoded with UTF8 received from the network every second. * - * Usage: NetworkWordCount <master> <hostname> <port> - * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. - * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. + * Usage: NetworkWordCount <hostname> <port> + * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount local[2] localhost 9999` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` */ -// scalastyle:on object NetworkWordCount { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + - "In local mode, <master> should be 'local[n]' with n > 1") + if (args.length < 2) { + System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() - + val sparkConf = new SparkConf().setAppName("NetworkWordCount"); // Create the context with a 1 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') - val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER) + val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala index f92f72f2de..4caa906591 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala @@ -19,6 +19,7 @@ package org.apache.spark.examples.streaming import scala.collection.mutable.SynchronizedQueue +import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ @@ -26,16 +27,11 @@ import org.apache.spark.streaming.StreamingContext._ object QueueStream { def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: QueueStream <master>") - System.exit(1) - } StreamingExamples.setStreamingLogLevels() - + val sparkConf = new SparkConf().setAppName("QueueStream") // Create the context - val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create the queue through which RDDs can be pushed to // a QueueInputDStream diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala index 1b0319a046..a9aaa445bc 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala @@ -17,6 +17,7 @@ package org.apache.spark.examples.streaming +import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.util.IntParam @@ -27,29 +28,26 @@ import org.apache.spark.util.IntParam * will only work with spark.streaming.util.RawTextSender running on all worker nodes * and with Spark using Kryo serialization (set Java property "spark.serializer" to * "org.apache.spark.serializer.KryoSerializer"). - * Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis> - * <master> is the Spark master URL + * Usage: RawNetworkGrep <numStreams> <host> <port> <batchMillis> * <numStream> is the number rawNetworkStreams, which should be same as number * of work nodes in the cluster * <host> is "localhost". * <port> is the port on which RawTextSender is running in the worker nodes. * <batchMillise> is the Spark Streaming batch duration in milliseconds. */ - object RawNetworkGrep { def main(args: Array[String]) { - if (args.length != 5) { - System.err.println("Usage: RawNetworkGrep <master> <numStreams> <host> <port> <batchMillis>") + if (args.length != 4) { + System.err.println("Usage: RawNetworkGrep <numStreams> <host> <port> <batchMillis>") System.exit(1) } StreamingExamples.setStreamingLogLevels() - val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - + val Array(IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args + val sparkConf = new SparkConf().setAppName("RawNetworkGrep") // Create the context - val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) val rawStreams = (1 to numStreams).map(_ => ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index b0bc31cc66..ace785d9fe 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -17,19 +17,21 @@ package org.apache.spark.examples.streaming +import java.io.File +import java.nio.charset.Charset + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Time, Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.util.IntParam -import java.io.File -import org.apache.spark.rdd.RDD -import com.google.common.io.Files -import java.nio.charset.Charset /** * Counts words in text encoded with UTF8 received from the network every second. * - * Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file> - * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * Usage: NetworkWordCount <hostname> <port> <checkpoint-directory> <output-file> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive * data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data * <output-file> file to which the word counts will be appended @@ -44,8 +46,9 @@ import java.nio.charset.Charset * * and run the example as * - * `$ ./run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ - * local[2] localhost 9999 ~/checkpoint/ ~/out` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ + * localhost 9999 ~/checkpoint/ ~/out` * * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if @@ -67,17 +70,16 @@ import java.nio.charset.Charset object RecoverableNetworkWordCount { - def createContext(master: String, ip: String, port: Int, outputPath: String) = { + def createContext(ip: String, port: Int, outputPath: String) = { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint println("Creating new context") val outputFile = new File(outputPath) if (outputFile.exists()) outputFile.delete() - + val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount") // Create the context with a 1 second batch size - val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') @@ -94,13 +96,12 @@ object RecoverableNetworkWordCount { } def main(args: Array[String]) { - if (args.length != 5) { + if (args.length != 4) { System.err.println("You arguments were " + args.mkString("[", ", ", "]")) System.err.println( """ - |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> - | <output-file> <master> is the Spark master URL. In local mode, <master> should be - | 'local[n]' with n > 1. <hostname> and <port> describe the TCP server that Spark + |Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> + | <output-file>. <hostname> and <port> describe the TCP server that Spark | Streaming would connect to receive data. <checkpoint-directory> directory to | HDFS-compatible file system which checkpoint data <output-file> file to which the | word counts will be appended @@ -111,10 +112,10 @@ object RecoverableNetworkWordCount { ) System.exit(1) } - val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args + val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { - createContext(master, ip, port, outputPath) + createContext(ip, port, outputPath) }) ssc.start() ssc.awaitTermination() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala index 8001d56c98..5e1415f3cc 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -17,28 +17,27 @@ package org.apache.spark.examples.streaming +import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -// scalastyle:off + /** * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every * second. - * Usage: StatefulNetworkWordCount <master> <hostname> <port> - * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * Usage: StatefulNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive * data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount local[2] localhost 9999` + * `$ ./bin/spark-submit examples.jar + * --class org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999` */ -// scalastyle:on object StatefulNetworkWordCount { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: StatefulNetworkWordCount <master> <hostname> <port>\n" + - "In local mode, <master> should be 'local[n]' with n > 1") + if (args.length < 2) { + System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>") System.exit(1) } @@ -52,14 +51,14 @@ object StatefulNetworkWordCount { Some(currentCount + previousCount) } + val sparkConf = new SparkConf().setAppName("NetworkWordCumulativeCountUpdateStateByKey") // Create the context with a 1 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", - Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') - val lines = ssc.socketTextStream(args(1), args(2).toInt) + val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala index b12617d881..683752ac96 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala @@ -19,11 +19,13 @@ package org.apache.spark.examples.streaming import com.twitter.algebird._ +import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.twitter._ + // scalastyle:off /** * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute @@ -49,12 +51,6 @@ import org.apache.spark.streaming.twitter._ // scalastyle:on object TwitterAlgebirdCMS { def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: TwitterAlgebirdCMS <master>" + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } - StreamingExamples.setStreamingLogLevels() // CMS parameters @@ -65,10 +61,9 @@ object TwitterAlgebirdCMS { // K highest frequency elements to take val TOPK = 10 - val (master, filters) = (args.head, args.tail) - - val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val filters = args + val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS") + val ssc = new StreamingContext(sparkConf, Seconds(10)) val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala index 22f232c725..62db5e663b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala @@ -23,6 +23,8 @@ import com.twitter.algebird.HyperLogLog._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.twitter._ +import org.apache.spark.SparkConf + // scalastyle:off /** * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute @@ -42,20 +44,14 @@ import org.apache.spark.streaming.twitter._ // scalastyle:on object TwitterAlgebirdHLL { def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: TwitterAlgebirdHLL <master>" + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } StreamingExamples.setStreamingLogLevels() /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ val BIT_SIZE = 12 - val (master, filters) = (args.head, args.tail) - - val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val filters = args + val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL") + val ssc = new StreamingContext(sparkConf, Seconds(5)) val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala index 5b58e94600..1ddff22cb8 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala @@ -21,6 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} import StreamingContext._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming.twitter._ +import org.apache.spark.SparkConf /** * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter @@ -30,18 +31,12 @@ import org.apache.spark.streaming.twitter._ */ object TwitterPopularTags { def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: TwitterPopularTags <master>" + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } StreamingExamples.setStreamingLogLevels() - val (master, filters) = (args.head, args.tail) - - val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val filters = args + val sparkConf = new SparkConf().setAppName("TwitterPopularTags") + val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index de46e5f5b1..7ade3f1018 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -28,6 +28,7 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.zeromq._ import scala.language.implicitConversions +import org.apache.spark.SparkConf /** * A simple publisher for demonstration purposes, repeatedly publishes random Messages @@ -63,30 +64,28 @@ object SimpleZeroMQPublisher { * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide] * (http://www.zeromq.org/intro:get-the-software) * - * Usage: ZeroMQWordCount <master> <zeroMQurl> <topic> - * In local mode, <master> should be 'local[n]' with n > 1 + * Usage: ZeroMQWordCount <zeroMQurl> <topic> * <zeroMQurl> and <topic> describe where zeroMq publisher is running. * * To run this example locally, you may run publisher as - * `$ ./bin/run-example org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` * and run the example as - * `$ ./bin/run-example org.apache.spark.examples.streaming.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` + * `$ ./bin/spark-submit examples.jar \ + * --class org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` */ // scalastyle:on object ZeroMQWordCount { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println( - "Usage: ZeroMQWordCount <master> <zeroMQurl> <topic>" + - "In local mode, <master> should be 'local[n]' with n > 1") + if (args.length < 2) { + System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>") System.exit(1) } StreamingExamples.setStreamingLogLevels() - val Seq(master, url, topic) = args.toSeq - + val Seq(url, topic) = args.toSeq + val sparkConf = new SparkConf().setAppName("ZeroMQWordCount") // Create the context and set the batch size - val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val ssc = new StreamingContext(sparkConf, Seconds(2)) def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index fa533a512d..d901d4fe22 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -27,10 +27,14 @@ import org.apache.spark.graphx.PartitionStrategy._ object Analytics extends Logging { def main(args: Array[String]): Unit = { - val host = args(0) - val taskType = args(1) - val fname = args(2) - val options = args.drop(3).map { arg => + if (args.length < 2) { + System.err.println("Usage: Analytics <taskType> <file> [other options]") + System.exit(1) + } + + val taskType = args(0) + val fname = args(1) + val options = args.drop(2).map { arg => arg.dropWhile(_ == '-').split('=') match { case Array(opt, v) => (opt -> v) case _ => throw new IllegalArgumentException("Invalid argument: " + arg) @@ -71,7 +75,7 @@ object Analytics extends Logging { println("| PageRank |") println("======================================") - val sc = new SparkContext(host, "PageRank(" + fname + ")", conf) + val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).cache() @@ -115,7 +119,7 @@ object Analytics extends Logging { println("| Connected Components |") println("======================================") - val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")", conf) + val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).cache() val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) @@ -137,7 +141,7 @@ object Analytics extends Logging { println("======================================") println("| Triangle Count |") println("======================================") - val sc = new SparkContext(host, "TriangleCount(" + fname + ")", conf) + val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() val triangles = TriangleCount.run(graph) -- GitLab