Skip to content
Snippets Groups Projects
Commit 98fd6260 authored by stayhf's avatar stayhf
Browse files

Updated code with reviewer's suggestions

parent a6826373
No related branches found
No related tags found
No related merge requests found
......@@ -61,47 +61,47 @@ public class JavaPageRank {
System.exit(1);
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
// Loads in input file. It should be in format of:
// URL neighbor URL
// URL neighbor URL
// URL neighbor URL
// ...
JavaRDD<String> lines = ctx.textFile(args[1], 1).cache();
// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
return new Tuple2<String, String>(s.split("\\s+")[0], s.split("\\s+")[1]);
}
}).distinct().groupByKey();
// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = lines.map(new PairFunction<String, String, Double>() {
@Override
public Tuple2<String, Double> call(String s) {
return new Tuple2<String, Double>(s.split("\\s+")[1], 1.0);
}
}).distinct();
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
// Loads in input file. It should be in format of:
// URL neighbor URL
// URL neighbor URL
// URL neighbor URL
// ...
JavaRDD<String> lines = ctx.textFile(args[1], 1);
// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] parts = s.split("\\s+");
return new Tuple2<String, String>(parts[0], parts[1]);
}
}).distinct().groupByKey().cache();
// Calculates and updates URL ranks continuously using PageRank algorithm.
for (int current = 0; current < Integer.parseInt(args[2]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMap(new FlatMapFunction<Tuple2<List<String>, Double>, Tuple2<String, Double>>() {
@Override
public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
for (String n : s._1) {
results.add(new Tuple2<String, Double>(n, s._2 / s._1.size()));
// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
@Override
public Double call(List<String> rs) throws Exception {
return 1.0;
}
});
// Calculates and updates URL ranks continuously using PageRank algorithm.
for (int current = 0; current < Integer.parseInt(args[2]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMap(new FlatMapFunction<Tuple2<List<String>, Double>, Tuple2<String, Double>>() {
@Override
public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
for (String n : s._1) {
results.add(new Tuple2<String, Double>(n, s._2 / s._1.size()));
}
return results;
}
return results;
}
}).map(new PairFunction<Tuple2<String, Double>, String, Double>() {
@Override
public Tuple2<String, Double> call(Tuple2<String, Double> s) {
......@@ -115,15 +115,15 @@ public class JavaPageRank {
public Double call(List<Double> cs) throws Exception {
return 0.15 + sum(cs) * 0.85;
}
});
}
});
}
// Collects all URL ranks and dump them to console.
List<Tuple2<String, Double>> output = ranks.collect();
for (Tuple2 tuple : output) {
System.out.println(tuple._1 + " has rank: " + round((Double)(tuple._2), 2) + ".");
}
// Collects all URL ranks and dump them to console.
List<Tuple2<String, Double>> output = ranks.collect();
for (Tuple2 tuple : output) {
System.out.println(tuple._1 + " has rank: " + round((Double)(tuple._2), 2) + ".");
}
System.exit(0);
System.exit(0);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment