Skip to content
Snippets Groups Projects
Commit 1bd5cefc authored by Ankur Dave's avatar Ankur Dave
Browse files

Remove aggregateNeighbors

parent e2d25d2d
No related branches found
No related tags found
No related merge requests found
......@@ -519,23 +519,6 @@ val avgAgeOlderFollowers: VertexRDD[Double] =
> are constant sized (e.g., floats and addition instead of lists and concatenation). More
> precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each vertex.
Because it is often necessary to aggregate information about neighboring vertices we also provide an
alternative interface defined in [`GraphOps`][GraphOps]:
{% highlight scala %}
def aggregateNeighbors[A](
map: (VertexID, EdgeTriplet[VD, ED]) => Option[A],
reduce: (A, A) => A,
edgeDir: EdgeDirection)
: VertexRDD[A]
{% endhighlight %}
The `aggregateNeighbors` operator is implemented directly on top of `mapReduceTriplets` but allows
the user to define the logic in a more vertex centric manner. Here the `map` function is provided
the vertex to which the message is sent as well as one of the edges and returns the optional message
value. The `edgeDir` determines whether the `map` function is run on `In`, `Out`, or `All` edges
adjacent to each vertex.
### Computing Degree Information
A common aggregation task is computing the degree of each vertex: the number of edges adjacent to
......
......@@ -55,60 +55,6 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
}
}
/**
* Computes a statistic for the neighborhood of each vertex.
*
* @param mapFunc the function applied to each edge adjacent to each vertex. The mapFunc can
* optionally return `None`, in which case it does not contribute to the final sum.
* @param reduceFunc the function used to merge the results of each map operation
* @param direction the direction of edges to consider (e.g., In, Out, Both).
* @tparam A the aggregation type
*
* @return an RDD containing tuples of vertex identifiers and
* their resulting value. Vertices with no neighbors will not appear in the RDD.
*
* @example We can use this function to compute the average follower
* age for each user:
*
* {{{
* val graph: Graph[Int,Int] = GraphLoader.edgeListFile(sc, "webgraph")
* val averageFollowerAge: RDD[(Int, Int)] =
* graph.aggregateNeighbors[(Int,Double)](
* (vid, edge) => Some((edge.otherVertex(vid).data, 1)),
* (a, b) => (a._1 + b._1, a._2 + b._2),
* -1,
* EdgeDirection.In)
* .mapValues{ case (sum,followers) => sum.toDouble / followers}
* }}}
*/
def aggregateNeighbors[A: ClassTag](
mapFunc: (VertexID, EdgeTriplet[VD, ED]) => Option[A],
reduceFunc: (A, A) => A,
dir: EdgeDirection)
: VertexRDD[A] = {
// Define a new map function over edge triplets
val mf = (et: EdgeTriplet[VD,ED]) => {
// Compute the message to the dst vertex
val dst =
if (dir == EdgeDirection.In || dir == EdgeDirection.Both) {
mapFunc(et.dstId, et)
} else { Option.empty[A] }
// Compute the message to the source vertex
val src =
if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) {
mapFunc(et.srcId, et)
} else { Option.empty[A] }
// construct the return array
(src, dst) match {
case (None, None) => Iterator.empty
case (Some(srcA),None) => Iterator((et.srcId, srcA))
case (None, Some(dstA)) => Iterator((et.dstId, dstA))
case (Some(srcA), Some(dstA)) => Iterator((et.srcId, srcA), (et.dstId, dstA))
}
}
graph.mapReduceTriplets(mf, reduceFunc)
} // end of aggregateNeighbors
/**
* Collect the neighbor vertex ids for each vertex.
*
......@@ -152,11 +98,11 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
*
* @return the vertex set of neighboring vertex attributes for each vertex
*/
def collectNeighbors(edgeDirection: EdgeDirection) :
VertexRDD[ Array[(VertexID, VD)] ] = {
val nbrs = graph.aggregateNeighbors[Array[(VertexID,VD)]](
(vid, edge) =>
Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )),
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] = {
val nbrs = graph.mapReduceTriplets[Array[(VertexID,VD)]](
edge => Iterator(
(edge.srcId, Array((edge.dstId, edge.dstAttr))),
(edge.dstId, Array((edge.srcId, edge.srcAttr)))),
(a, b) => a ++ b,
edgeDirection)
......
......@@ -8,32 +8,6 @@ import org.scalatest.FunSuite
class GraphOpsSuite extends FunSuite with LocalSparkContext {
test("aggregateNeighbors") {
withSpark { sc =>
val n = 3
val star =
Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID))), 1)
val indegrees = star.aggregateNeighbors(
(vid, edge) => Some(1),
(a: Int, b: Int) => a + b,
EdgeDirection.In)
assert(indegrees.collect().toSet === (1 to n).map(x => (x, 1)).toSet)
val outdegrees = star.aggregateNeighbors(
(vid, edge) => Some(1),
(a: Int, b: Int) => a + b,
EdgeDirection.Out)
assert(outdegrees.collect().toSet === Set((0, n)))
val noVertexValues = star.aggregateNeighbors[Int](
(vid: VertexID, edge: EdgeTriplet[Int, Int]) => None,
(a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"),
EdgeDirection.In)
assert(noVertexValues.collect().toSet === Set.empty[(VertexID, Int)])
}
}
test("joinVertices") {
withSpark { sc =>
val vertices =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment