Skip to content
Snippets Groups Projects
Commit 78062b85 authored by Andrew Ray's avatar Andrew Ray Committed by Ankur Dave
Browse files

[SPARK-18845][GRAPHX] PageRank has incorrect initialization value that leads to slow convergence

## What changes were proposed in this pull request?

Change the initial value in all PageRank implementations to be `1.0` instead of `resetProb` (default `0.15`) and use `outerJoinVertices` instead of `joinVertices` so that source vertices get updated in each iteration.

This seems to have been introduced a long time ago in https://github.com/apache/spark/commit/15a564598fe63003652b1e24527c432080b5976c#diff-b2bf3f97dcd2f19d61c921836159cda9L90

With the exception of graphs with sinks (which currently give incorrect results see SPARK-18847) this gives faster convergence as the sum of ranks is already correct (sum of ranks should be number of vertices).

Convergence comparision benchmark for small graph: http://imgur.com/a/HkkZf
Code for benchmark: https://gist.github.com/aray/a7de1f3801a810f8b1fa00c271a1fefd

## How was this patch tested?

(corrected) existing unit tests and additional test that verifies against result of igraph and NetworkX on a loop with a source.

Author: Andrew Ray <ray.andrew@gmail.com>

Closes #16271 from aray/pagerank-initial-value.
parent 172a52f5
No related branches found
No related tags found
No related merge requests found
......@@ -115,9 +115,9 @@ object PageRank extends Logging {
val src: VertexId = srcId.getOrElse(-1L)
// Initialize the PageRank graph with each edge attribute having
// weight 1/outDegree and each vertex with attribute resetProb.
// weight 1/outDegree and each vertex with attribute 1.0.
// When running personalized pagerank, only the source vertex
// has an attribute resetProb. All others are set to 0.
// has an attribute 1.0. All others are set to 0.
var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
......@@ -125,7 +125,7 @@ object PageRank extends Logging {
.mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
// Set the vertex attributes to the initial pagerank values
.mapVertices { (id, attr) =>
if (!(id != src && personalized)) resetProb else 0.0
if (!(id != src && personalized)) 1.0 else 0.0
}
def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
......@@ -150,8 +150,8 @@ object PageRank extends Logging {
(src: VertexId, id: VertexId) => resetProb
}
rankGraph = rankGraph.joinVertices(rankUpdates) {
(id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum
rankGraph = rankGraph.outerJoinVertices(rankUpdates) {
(id, oldRank, msgSumOpt) => rPrb(src, id) + (1.0 - resetProb) * msgSumOpt.getOrElse(0.0)
}.cache()
rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
......@@ -196,7 +196,7 @@ object PageRank extends Logging {
// we won't be able to store its activations in a sparse vector
val zero = Vectors.sparse(sources.size, List()).asBreeze
val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
val v = Vectors.sparse(sources.size, Array(i), Array(resetProb)).asBreeze
val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze
(vid, v)
}.toMap
val sc = graph.vertices.sparkContext
......@@ -225,11 +225,11 @@ object PageRank extends Logging {
ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr),
(a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src)
rankGraph = rankGraph.joinVertices(rankUpdates) {
(vid, oldRank, msgSum) =>
val popActivations: BV[Double] = msgSum :* (1.0 - resetProb)
rankGraph = rankGraph.outerJoinVertices(rankUpdates) {
(vid, oldRank, msgSumOpt) =>
val popActivations: BV[Double] = msgSumOpt.getOrElse(zero) :* (1.0 - resetProb)
val resetActivations = if (sourcesInitMapBC.value contains vid) {
sourcesInitMapBC.value(vid)
sourcesInitMapBC.value(vid) :* resetProb
} else {
zero
}
......@@ -307,7 +307,7 @@ object PageRank extends Logging {
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initialPR, delta = 0)
.mapVertices { (id, attr) =>
if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0)
if (id == src) (1.0, Double.NegativeInfinity) else (0.0, 0.0)
}
.cache()
......@@ -323,7 +323,7 @@ object PageRank extends Logging {
msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
var teleport = oldPR
val delta = if (src==id) 1.0 else 0.0
val delta = if (src==id) resetProb else 0.0
teleport = oldPR*delta
val newPR = teleport + (1.0 - resetProb) * msgSum
......
......@@ -41,7 +41,7 @@ object GridPageRank {
}
}
// compute the pagerank
var pr = Array.fill(nRows * nCols)(resetProb)
var pr = Array.fill(nRows * nCols)(1.0)
for (iter <- 0 until nIter) {
val oldPr = pr
pr = new Array[Double](nRows * nCols)
......@@ -70,10 +70,10 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val resetProb = 0.15
val errorTol = 1.0e-5
val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices
val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache()
val staticRanks1 = starGraph.staticPageRank(numIter = 2, resetProb).vertices
val staticRanks2 = starGraph.staticPageRank(numIter = 3, resetProb).vertices.cache()
// Static PageRank should only take 2 iterations to converge
// Static PageRank should only take 3 iterations to converge
val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
if (pr1 != pr2) 1 else 0
}.map { case (vid, test) => test }.sum()
......@@ -203,4 +203,30 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
}
}
test("Loop with source PageRank") {
withSpark { sc =>
val edges = sc.parallelize((1L, 2L) :: (2L, 3L) :: (3L, 4L) :: (4L, 2L) :: Nil)
val g = Graph.fromEdgeTuples(edges, 1)
val resetProb = 0.15
val tol = 0.0001
val numIter = 50
val errorTol = 1.0e-5
val staticRanks = g.staticPageRank(numIter, resetProb).vertices
val dynamicRanks = g.pageRank(tol, resetProb).vertices
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
// Computed in igraph 1.0 w/ R bindings:
// > page_rank(graph_from_literal( A -+ B -+ C -+ D -+ B))
// Alternatively in NetworkX 1.11:
// > nx.pagerank(nx.DiGraph([(1,2),(2,3),(3,4),(4,2)]))
// We multiply by the number of vertices to account for difference in normalization
val igraphPR = Seq(0.0375000, 0.3326045, 0.3202138, 0.3096817).map(_ * 4)
val ranks = VertexRDD(sc.parallelize(1L to 4L zip igraphPR))
assert(compareRanks(staticRanks, ranks) < errorTol)
assert(compareRanks(dynamicRanks, ranks) < errorTol)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment