Skip to content
Snippets Groups Projects
Commit f825e193 authored by Takeshi Yamamuro's avatar Takeshi Yamamuro Committed by Ankur Dave
Browse files

[SPARK-4917] Add a function to convert into a graph with canonical edges in GraphOps

Convert bi-directional edges into uni-directional ones instead of 'canonicalOrientation' in GraphLoader.edgeListFile.
This function is useful when a graph is loaded as it is and then is transformed into one with canonical edges.
It rewrites the vertex ids of edges so that srcIds are bigger than dstIds, and merges the duplicated edges.

Author: Takeshi Yamamuro <linguin.m.s@gmail.com>

Closes #3760 from maropu/ConvertToCanonicalEdgesSpike and squashes the following commits:

7f8b580 [Takeshi Yamamuro] Add a function to convert into a graph with canonical edges in GraphOps
parent 8d45834d
No related branches found
No related tags found
No related merge requests found
......@@ -278,6 +278,32 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
retVal
}
/**
* Convert bi-directional edges into uni-directional ones.
* Some graph algorithms (e.g., TriangleCount) assume that an input graph
* has its edges in canonical direction.
* This function rewrites the vertex ids of edges so that srcIds are bigger
* than dstIds, and merges the duplicated edges.
*
* @param mergeFunc the user defined reduce function which should
* be commutative and associative and is used to combine the output
* of the map phase
*
* @return the resulting graph with canonical edges
*/
def convertToCanonicalEdges(
mergeFunc: (ED, ED) => ED = (e1, e2) => e1): Graph[VD, ED] = {
val newEdges =
graph.edges
.map {
case e if e.srcId < e.dstId => ((e.srcId, e.dstId), e.attr)
case e => ((e.dstId, e.srcId), e.attr)
}
.reduceByKey(mergeFunc)
.map(e => new Edge(e._1._1, e._1._2, e._2))
Graph(graph.vertices, newEdges)
}
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
......
......@@ -79,6 +79,21 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
}
}
test ("convertToCanonicalEdges") {
withSpark { sc =>
val vertices =
sc.parallelize(Seq[(VertexId, String)]((1, "one"), (2, "two"), (3, "three")), 2)
val edges =
sc.parallelize(Seq(Edge(1, 2, 1), Edge(2, 1, 1), Edge(3, 2, 2)))
val g: Graph[String, Int] = Graph(vertices, edges)
val g1 = g.convertToCanonicalEdges()
val e = g1.edges.collect().toSet
assert(e === Set(Edge(1, 2, 1), Edge(2, 3, 2)))
}
}
test("collectEdgesCycleDirectionOut") {
withSpark { sc =>
val graph = getCycleGraph(sc, 100)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment