Skip to content
Snippets Groups Projects
Commit 87676a6a authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #220 from rxin/zippart


Memoize preferred locations in ZippedPartitionsBaseRDD

so preferred location computation doesn't lead to exponential explosion.

This was a problem in GraphX where we have a whole chain of RDDs that are ZippedPartitionsRDD's, and the preferred locations were taking eternity to compute.

(cherry picked from commit e36fe55a)
Signed-off-by: default avatarReynold Xin <rxin@apache.org>
parents 07804987 9cf7f31e
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,8 @@ import java.io.{ObjectOutputStream, IOException}
private[spark] class ZippedPartitionsPartition(
idx: Int,
@transient rdds: Seq[RDD[_]])
@transient rdds: Seq[RDD[_]],
@transient val preferredLocations: Seq[String])
extends Partition {
override val index: Int = idx
......@@ -47,27 +48,21 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
if (preservesPartitioning) firstParent[Any].partitioner else None
override def getPartitions: Array[Partition] = {
val sizes = rdds.map(x => x.partitions.size)
if (!sizes.forall(x => x == sizes(0))) {
val numParts = rdds.head.partitions.size
if (!rdds.forall(rdd => rdd.partitions.size == numParts)) {
throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
}
val array = new Array[Partition](sizes(0))
for (i <- 0 until sizes(0)) {
array(i) = new ZippedPartitionsPartition(i, rdds)
Array.tabulate[Partition](numParts) { i =>
val prefs = rdds.map(rdd => rdd.preferredLocations(rdd.partitions(i)))
// Check whether there are any hosts that match all RDDs; otherwise return the union
val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y))
val locs = if (!exactMatchLocations.isEmpty) exactMatchLocations else prefs.flatten.distinct
new ZippedPartitionsPartition(i, rdds, locs)
}
array
}
override def getPreferredLocations(s: Partition): Seq[String] = {
val parts = s.asInstanceOf[ZippedPartitionsPartition].partitions
val prefs = rdds.zip(parts).map { case (rdd, p) => rdd.preferredLocations(p) }
// Check whether there are any hosts that match all RDDs; otherwise return the union
val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y))
if (!exactMatchLocations.isEmpty) {
exactMatchLocations
} else {
prefs.flatten.distinct
}
s.asInstanceOf[ZippedPartitionsPartition].preferredLocations
}
override def clearDependencies() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment