Skip to content
Snippets Groups Projects
Commit 74fdfac1 authored by Ankur Dave's avatar Ankur Dave
Browse files

Fix AbstractMethodError by inlining zip{Edge,Vertex}Partitions

The zip{Edge,Vertex}Partitions methods created doubly-nested closures
and passed them to zipPartitions. For some reason this caused an
AbstractMethodError when zipPartitions tried to invoke the closure. This
commit works around the problem by inlining these methods wherever they
are called, eliminating the doubly-nested closure.
parent ab861d84
No related branches found
No related tags found
No related merge requests found
......@@ -53,34 +53,17 @@ class EdgeRDD[@specialized ED: ClassTag](
}, preservesPartitioning = true))
}
def zipEdgePartitions[T: ClassTag, U: ClassTag]
(other: RDD[T])
(f: (Pid, EdgePartition[ED], Iterator[T]) => Iterator[U]): RDD[U] = {
partitionsRDD.zipPartitions(other, preservesPartitioning = true) { (ePartIter, otherIter) =>
val (pid, edgePartition) = ePartIter.next()
f(pid, edgePartition, otherIter)
}
}
def zipEdgePartitions[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
(f: (Pid, EdgePartition[ED], EdgePartition[ED2]) => EdgePartition[ED3]): EdgeRDD[ED3] = {
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, preservesPartitioning = true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, f(pid, thisEPart, otherEPart)))
})
}
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
(f: (Vid, Vid, ED, ED2) => ED3): EdgeRDD[ED3] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
zipEdgePartitions(other) { (pid, thisEPart, otherEPart) =>
thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)
}
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
})
}
def collectVids(): RDD[Vid] = {
......
......@@ -119,22 +119,6 @@ class VertexRDD[@specialized VD: ClassTag](
new VertexRDD(newPartitionsRDD)
}
/**
* Return a new VertexRDD by applying a function to corresponding
* VertexPartitions of this VertexRDD and another one.
*/
def zipVertexPartitions[VD2: ClassTag, VD3: ClassTag]
(other: VertexRDD[VD2])
(f: (VertexPartition[VD], VertexPartition[VD2]) => VertexPartition[VD3]): VertexRDD[VD3] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
val thisPart = thisIter.next()
val otherPart = otherIter.next()
Iterator(f(thisPart, otherPart))
}
new VertexRDD(newPartitionsRDD)
}
/**
* Restrict the vertex set to the set of vertices satisfying the
......@@ -184,9 +168,14 @@ class VertexRDD[@specialized VD: ClassTag](
* the values from `other`.
*/
def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
thisPart.diff(otherPart)
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
val thisPart = thisIter.next()
val otherPart = otherIter.next()
Iterator(thisPart.diff(otherPart))
}
new VertexRDD(newPartitionsRDD)
}
/**
......@@ -209,9 +198,14 @@ class VertexRDD[@specialized VD: ClassTag](
*/
def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
(other: VertexRDD[VD2])(f: (Vid, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
thisPart.leftJoin(otherPart)(f)
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
val thisPart = thisIter.next()
val otherPart = otherIter.next()
Iterator(thisPart.leftJoin(otherPart)(f))
}
new VertexRDD(newPartitionsRDD)
}
/**
......@@ -261,9 +255,14 @@ class VertexRDD[@specialized VD: ClassTag](
*/
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
(f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
thisPart.innerJoin(otherPart)(f)
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
val thisPart = thisIter.next()
val otherPart = otherIter.next()
Iterator(thisPart.innerJoin(otherPart)(f))
}
new VertexRDD(newPartitionsRDD)
}
/**
......
......@@ -49,7 +49,9 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
val vdTag = classTag[VD]
val edTag = classTag[ED]
edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (pid, ePart, vPartIter) =>
edges.partitionsRDD.zipPartitions(
replicatedVertexView.get(true, true), true) { (ePartIter, vPartIter) =>
val (pid, ePart) = ePartIter.next()
val (_, vPart) = vPartIter.next()
new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
}
......@@ -182,8 +184,9 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
// manifest from GraphImpl (which would require serializing GraphImpl).
val vdTag = classTag[VD]
val newEdgePartitions =
edges.zipEdgePartitions(replicatedVertexView.get(true, true)) {
(ePid, edgePartition, vTableReplicatedIter) =>
edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) {
(ePartIter, vTableReplicatedIter) =>
val (ePid, edgePartition) = ePartIter.next()
val (vPid, vPart) = vTableReplicatedIter.next()
assert(!vTableReplicatedIter.hasNext)
assert(ePid == vPid)
......@@ -267,7 +270,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val activeDirectionOpt = activeSetOpt.map(_._2)
// Map and combine.
val preAgg = edges.zipEdgePartitions(vs) { (ePid, edgePartition, vPartIter) =>
val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) =>
val (ePid, edgePartition) = ePartIter.next()
val (vPid, vPart) = vPartIter.next()
assert(!vPartIter.hasNext)
assert(ePid == vPid)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment