Skip to content
Snippets Groups Projects
Commit 13757b11 authored by Nick Pentreath's avatar Nick Pentreath
Browse files

Adding Java versions of Pi and LogQuery

parent cbf8f0d4
No related branches found
No related tags found
No related merge requests found
package spark.examples;
import com.google.common.collect.Lists;
import scala.Tuple2;
import scala.Tuple3;
import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.Function2;
import spark.api.java.function.PairFunction;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Executes a roll up-style query against Apache logs.
*/
public class JavaLogQuery {
public static List<String> exampleApacheLogs = Lists.newArrayList(
"10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " +
"HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; " +
".NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR 3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR " +
"3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.350 \"-\" - \"\" 265 923 934 \"\" " +
"62.24.11.25 images.com 1358492167 - Whatup",
"10.10.10.10 - \"FRED\" [18/Jan/2013:18:02:37 +1100] \"GET http://images.com/2013/Generic.jpg " +
"HTTP/1.1\" 304 306 \"http:/referall.com\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; " +
"GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; .NET CLR " +
"3.5.21022; .NET CLR 3.0.4506.2152; .NET CLR 1.0.3705; .NET CLR 1.1.4322; .NET CLR " +
"3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " +
"0 73.23.2.15 images.com 1358492557 - Whatup");
public static Pattern apacheLogRegex = Pattern.compile("^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*");
/** Tracks the total query count and number of aggregate bytes for a particular group. */
public static class Stats implements Serializable {
private int count;
private int numBytes;
public Stats(int count, int numBytes) {
this.count = count;
this.numBytes = numBytes;
}
public Stats merge(Stats other) {
return new Stats(count + other.count, numBytes + other.numBytes);
}
public String toString() {
return String.format("bytes=%s\tn=%s", numBytes, count);
}
}
public static Tuple3<String, String, String> extractKey(String line) {
Matcher m = apacheLogRegex.matcher(line);
List<String> key = Collections.emptyList();
if (m.find()) {
String ip = m.group(1);
String user = m.group(3);
String query = m.group(5);
if (!user.equalsIgnoreCase("-")) {
return new Tuple3<String, String, String>(ip, user, query);
}
}
return new Tuple3<String, String, String>(null, null, null);
}
public static Stats extractStats(String line) {
Matcher m = apacheLogRegex.matcher(line);
if (m.find()) {
int bytes = Integer.parseInt(m.group(7));
return new Stats(1, bytes);
}
else
return new Stats(1, 0);
}
public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.err.println("Usage: JavaLogQuery <master> [logFile]");
System.exit(1);
}
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
@Override
public Tuple2<Tuple3<String, String, String>, Stats> call(String s) throws Exception {
return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));
}
});
JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() {
@Override
public Stats call(Stats stats, Stats stats2) throws Exception {
return stats.merge(stats2);
}
});
List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect();
for (Tuple2 t : output) {
System.out.println(t._1 + "\t" + t._2);
}
System.exit(0);
}
}
package spark.examples;
import spark.api.java.JavaRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.Function;
import spark.api.java.function.Function2;
import java.util.ArrayList;
import java.util.List;
/** Computes an approximation to pi */
public class JavaSparkPi {
public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.err.println("Usage: JavaLogQuery <master> [slices]");
System.exit(1);
}
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++)
l.add(i);
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
int count = dataSet.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) throws Exception {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
System.out.println("Pi is roughly " + 4.0 * count / n);
}
}
......@@ -4,6 +4,7 @@ import scala.math.random
import spark._
import SparkContext._
/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
if (args.length == 0) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment