Skip to content
Snippets Groups Projects
Commit 9f603fce authored by Brennon York's avatar Brennon York Committed by Ankur Dave
Browse files

[SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing

Fixes the issue whereby when VertexRDD's are `diff`ed, `innerJoin`ed, or `leftJoin`ed and have different partition sizes they fail under the `zipPartitions` method. This fix tests whether the partitions are equal or not and, if not, will repartition the other to match the partition size of the calling VertexRDD.

Author: Brennon York <brennon.york@capitalone.com>

Closes #4705 from brennonyork/SPARK-1955 and squashes the following commits:

0882590 [Brennon York] updated to properly handle differently-partitioned vertexRDDs
parent a777c65d
No related branches found
No related tags found
No related merge requests found
...@@ -104,8 +104,14 @@ class VertexRDDImpl[VD] private[graphx] ( ...@@ -104,8 +104,14 @@ class VertexRDDImpl[VD] private[graphx] (
this.mapVertexPartitions(_.map(f)) this.mapVertexPartitions(_.map(f))
override def diff(other: VertexRDD[VD]): VertexRDD[VD] = { override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
val otherPartition = other match {
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
other.partitionsRDD
case _ =>
VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD
}
val newPartitionsRDD = partitionsRDD.zipPartitions( val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true otherPartition, preservesPartitioning = true
) { (thisIter, otherIter) => ) { (thisIter, otherIter) =>
val thisPart = thisIter.next() val thisPart = thisIter.next()
val otherPart = otherIter.next() val otherPart = otherIter.next()
...@@ -133,7 +139,7 @@ class VertexRDDImpl[VD] private[graphx] ( ...@@ -133,7 +139,7 @@ class VertexRDDImpl[VD] private[graphx] (
// Test if the other vertex is a VertexRDD to choose the optimal join strategy. // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient leftZipJoin // If the other set is a VertexRDD then we use the much more efficient leftZipJoin
other match { other match {
case other: VertexRDD[_] => case other: VertexRDD[_] if this.partitioner == other.partitioner =>
leftZipJoin(other)(f) leftZipJoin(other)(f)
case _ => case _ =>
this.withPartitionsRDD[VD3]( this.withPartitionsRDD[VD3](
...@@ -162,7 +168,7 @@ class VertexRDDImpl[VD] private[graphx] ( ...@@ -162,7 +168,7 @@ class VertexRDDImpl[VD] private[graphx] (
// Test if the other vertex is a VertexRDD to choose the optimal join strategy. // Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient innerZipJoin // If the other set is a VertexRDD then we use the much more efficient innerZipJoin
other match { other match {
case other: VertexRDD[_] => case other: VertexRDD[_] if this.partitioner == other.partitioner =>
innerZipJoin(other)(f) innerZipJoin(other)(f)
case _ => case _ =>
this.withPartitionsRDD( this.withPartitionsRDD(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment