Skip to content
Snippets Groups Projects
Commit 3d79f106 authored by Robin East's avatar Robin East Committed by Reynold Xin
Browse files

[SPARK-3650][GRAPHX] Triangle Count handles reverse edges incorrectly

jegonzal ankurdave please could you review

## What changes were proposed in this pull request?

Reworking of jegonzal PR #2495 to address the issue identified in SPARK-3650. Code amended to use the convertToCanonicalEdges method.

## How was the this patch tested?

Patch was tested using the unit tests created in PR #2495

Author: Robin East <robin.east@xense.co.uk>
Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>

Closes #11290 from insidedctm/spark-3650.
parent 0f90f4e6
No related branches found
No related tags found
No related merge requests found
...@@ -17,10 +17,15 @@ ...@@ -17,10 +17,15 @@
package org.apache.spark.graphx package org.apache.spark.graphx
import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.util.Random import scala.util.Random
import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.SparkException import org.apache.spark.SparkException
import org.apache.spark.graphx.impl.EdgePartitionBuilder
import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.graphx.lib._ import org.apache.spark.graphx.lib._
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
...@@ -183,6 +188,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali ...@@ -183,6 +188,15 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
} }
} }
/**
* Remove self edges.
*
* @return a graph with all self edges removed
*/
def removeSelfEdges(): Graph[VD, ED] = {
graph.subgraph(epred = e => e.srcId != e.dstId)
}
/** /**
* Join the vertices with an RDD and then apply a function from the * Join the vertices with an RDD and then apply a function from the
* vertex and RDD entry to a new vertex value. The input table * vertex and RDD entry to a new vertex value. The input table
......
...@@ -20,6 +20,7 @@ package org.apache.spark.graphx.lib ...@@ -20,6 +20,7 @@ package org.apache.spark.graphx.lib
import scala.reflect.ClassTag import scala.reflect.ClassTag
import org.apache.spark.graphx._ import org.apache.spark.graphx._
import org.apache.spark.graphx.PartitionStrategy.EdgePartition2D
/** /**
* Compute the number of triangles passing through each vertex. * Compute the number of triangles passing through each vertex.
...@@ -27,25 +28,47 @@ import org.apache.spark.graphx._ ...@@ -27,25 +28,47 @@ import org.apache.spark.graphx._
* The algorithm is relatively straightforward and can be computed in three steps: * The algorithm is relatively straightforward and can be computed in three steps:
* *
* <ul> * <ul>
* <li>Compute the set of neighbors for each vertex * <li> Compute the set of neighbors for each vertex</li>
* <li>For each edge compute the intersection of the sets and send the count to both vertices. * <li> For each edge compute the intersection of the sets and send the count to both vertices.</li>
* <li> Compute the sum at each vertex and divide by two since each triangle is counted twice. * <li> Compute the sum at each vertex and divide by two since each triangle is counted twice.</li>
* </ul> * </ul>
* *
* Note that the input graph should have its edges in canonical direction * There are two implementations. The default `TriangleCount.run` implementation first removes
* (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned * self cycles and canonicalizes the graph to ensure that the following conditions hold:
* using [[org.apache.spark.graphx.Graph#partitionBy]]. * <ul>
* <li> There are no self edges</li>
* <li> All edges are oriented src > dst</li>
* <li> There are no duplicate edges</li>
* </ul>
* However, the canonicalization procedure is costly as it requires repartitioning the graph.
* If the input data is already in "canonical form" with self cycles removed then the
* `TriangleCount.runPreCanonicalized` should be used instead.
*
* {{{
* val canonicalGraph = graph.mapEdges(e => 1).removeSelfEdges().canonicalizeEdges()
* val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices
* }}}
*
*/ */
object TriangleCount { object TriangleCount {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
// Remove redundant edges // Transform the edge data something cheap to shuffle and then canonicalize
val g = graph.groupEdges((a, b) => a).cache() val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().convertToCanonicalEdges()
// Get the triangle counts
val counters = runPreCanonicalized(canonicalGraph).vertices
// Join them bath with the original graph
graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
optCounter.getOrElse(0)
}
}
def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
// Construct set representations of the neighborhoods // Construct set representations of the neighborhoods
val nbrSets: VertexRDD[VertexSet] = val nbrSets: VertexRDD[VertexSet] =
g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
val set = new VertexSet(4) val set = new VertexSet(nbrs.length)
var i = 0 var i = 0
while (i < nbrs.size) { while (i < nbrs.size) {
// prevent self cycle // prevent self cycle
...@@ -56,14 +79,14 @@ object TriangleCount { ...@@ -56,14 +79,14 @@ object TriangleCount {
} }
set set
} }
// join the sets with the graph // join the sets with the graph
val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) { val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
(vid, _, optSet) => optSet.getOrElse(null) (vid, _, optSet) => optSet.getOrElse(null)
} }
// Edge function computes intersection of smaller vertex with larger vertex // Edge function computes intersection of smaller vertex with larger vertex
def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) { def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
assert(ctx.srcAttr != null)
assert(ctx.dstAttr != null)
val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) { val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
(ctx.srcAttr, ctx.dstAttr) (ctx.srcAttr, ctx.dstAttr)
} else { } else {
...@@ -80,15 +103,15 @@ object TriangleCount { ...@@ -80,15 +103,15 @@ object TriangleCount {
ctx.sendToSrc(counter) ctx.sendToSrc(counter)
ctx.sendToDst(counter) ctx.sendToDst(counter)
} }
// compute the intersection along edges // compute the intersection along edges
val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _) val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
// Merge counters with the graph and divide by two since each triangle is counted twice // Merge counters with the graph and divide by two since each triangle is counted twice
g.outerJoinVertices(counters) { graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) =>
(vid, _, optCounter: Option[Int]) => val dblCount = optCounter.getOrElse(0)
val dblCount = optCounter.getOrElse(0) // This algorithm double counts each triangle so the final count should be even
// double count should be even (divisible by two) require(dblCount % 2 == 0, "Triangle count resulted in an invalid number of triangles.")
assert((dblCount & 1) == 0) dblCount / 2
dblCount / 2
} }
} // end of TriangleCount }
} }
...@@ -55,6 +55,21 @@ class GraphOpsSuite extends SparkFunSuite with LocalSparkContext { ...@@ -55,6 +55,21 @@ class GraphOpsSuite extends SparkFunSuite with LocalSparkContext {
} }
} }
test("removeSelfEdges") {
withSpark { sc =>
val edgeArray = Array((1 -> 2), (2 -> 3), (3 -> 3), (4 -> 3), (1 -> 1))
.map {
case (a, b) => (a.toLong, b.toLong)
}
val correctEdges = edgeArray.filter { case (a, b) => a != b }.toSet
val graph = Graph.fromEdgeTuples(sc.parallelize(edgeArray), 1)
val canonicalizedEdges = graph.removeSelfEdges().edges.map(e => (e.srcId, e.dstId))
.collect
assert(canonicalizedEdges.toSet.size === canonicalizedEdges.size)
assert(canonicalizedEdges.toSet === correctEdges)
}
}
test ("filter") { test ("filter") {
withSpark { sc => withSpark { sc =>
val n = 5 val n = 5
......
...@@ -64,9 +64,9 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext { ...@@ -64,9 +64,9 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {
val verts = triangleCount.vertices val verts = triangleCount.vertices
verts.collect().foreach { case (vid, count) => verts.collect().foreach { case (vid, count) =>
if (vid == 0) { if (vid == 0) {
assert(count === 4)
} else {
assert(count === 2) assert(count === 2)
} else {
assert(count === 1)
} }
} }
} }
...@@ -75,7 +75,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext { ...@@ -75,7 +75,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {
test("Count a single triangle with duplicate edges") { test("Count a single triangle with duplicate edges") {
withSpark { sc => withSpark { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(1L -> 0L, 1L -> 1L), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache() val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
val triangleCount = graph.triangleCount() val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices val verts = triangleCount.vertices
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment