Skip to content
Snippets Groups Projects
Commit 5b0986a1 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #334 from pwendell/examples-fix

Removing SPARK_EXAMPLES_JAR in the code

This re-writes all of the examples to use the `SparkContext.jarOfClass` mechanism for loading the examples jar. This necessary for environments like YARN and the Standalone mode where example programs will be submit from inside the cluster rather than at the client using `./spark-example`.

This still leaves SPARK_EXAMPLES_JAR in place in the shell scripts for setting up the classpath if `./spark-example` is run.
parents f4b924f6 79f52809
No related branches found
No related tags found
No related merge requests found
Showing
with 29 additions and 19 deletions
......@@ -431,4 +431,10 @@ object JavaSparkContext {
implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc)
implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc
/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to SparkContext.
*/
def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
}
......@@ -106,7 +106,7 @@ public class JavaHdfsLR {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaHdfsLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
int ITERATIONS = Integer.parseInt(args[2]);
......
......@@ -74,7 +74,7 @@ public class JavaKMeans {
System.exit(1);
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
String path = args[1];
int K = Integer.parseInt(args[2]);
double convergeDist = Double.parseDouble(args[3]);
......
......@@ -104,7 +104,7 @@ public class JavaLogQuery {
}
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLogQuery.class));
JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
......
......@@ -17,6 +17,7 @@
package org.apache.spark.examples;
import org.apache.spark.SparkContext;
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
......@@ -53,7 +54,7 @@ public class JavaPageRank {
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaPageRank.class));
// Loads in input file. It should be in format of:
// URL neighbor URL
......
......@@ -36,7 +36,7 @@ public class JavaSparkPi {
}
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkPi.class));
int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
int n = 100000 * slices;
......
......@@ -64,7 +64,7 @@ public class JavaTC {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaTC.class));
Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
......
......@@ -36,7 +36,7 @@ public class JavaWordCount {
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));
JavaRDD<String> lines = ctx.textFile(args[1], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
......
......@@ -68,7 +68,7 @@ public class JavaALS {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<Rating> ratings = lines.map(new ParseRating());
......
......@@ -62,7 +62,7 @@ public class JavaKMeans {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<double[]> points = lines.map(new ParsePoint());
......
......@@ -59,7 +59,7 @@ public class JavaLR {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
double stepSize = Double.parseDouble(args[2]);
......
......@@ -50,7 +50,8 @@ public class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000);
JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
......
......@@ -54,7 +54,8 @@ public class JavaKafkaWordCount {
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "KafkaWordCount",
new Duration(2000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
new Duration(2000), System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class));
int numThreads = Integer.parseInt(args[4]);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
......
......@@ -48,7 +48,8 @@ public class JavaNetworkWordCount {
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
new Duration(1000), System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
......
......@@ -40,7 +40,7 @@ public class JavaQueueStream {
// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
......
......@@ -33,7 +33,7 @@ object BroadcastTest {
System.setProperty("spark.broadcast.blockSize", blockSize)
val sc = new SparkContext(args(0), "Broadcast Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
......
......@@ -27,7 +27,7 @@ object ExceptionHandlingTest {
}
val sc = new SparkContext(args(0), "ExceptionHandlingTest",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
if (math.random > 0.75)
throw new Exception("Testing exception handling")
......
......@@ -34,7 +34,7 @@ object GroupByTest {
var numReducers = if (args.length > 4) args(4).toInt else numMappers
val sc = new SparkContext(args(0), "GroupBy Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
......
......@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.mapreduce.TableInputFormat
object HBaseTest {
def main(args: Array[String]) {
val sc = new SparkContext(args(0), "HBaseTest",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val conf = HBaseConfiguration.create()
......
......@@ -22,7 +22,7 @@ import org.apache.spark._
object HdfsTest {
def main(args: Array[String]) {
val sc = new SparkContext(args(0), "HdfsTest",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val file = sc.textFile(args(1))
val mapped = file.map(s => s.length).cache()
for (iter <- 1 to 10) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment