Skip to content
Snippets Groups Projects
Commit 9cf7f31e authored by Reynold Xin's avatar Reynold Xin
Browse files

Memoize preferred locations in ZippedPartitionsBaseRDD so preferred location...

Memoize preferred locations in ZippedPartitionsBaseRDD so preferred location computation doesn't lead to exponential explosion.

(cherry picked from commit e36fe55a)
Signed-off-by: default avatarReynold Xin <rxin@apache.org>
parent 743a31a7
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,8 @@ import java.io.{ObjectOutputStream, IOException}
private[spark] class ZippedPartitionsPartition(
idx: Int,
@transient rdds: Seq[RDD[_]])
@transient rdds: Seq[RDD[_]],
@transient val preferredLocations: Seq[String])
extends Partition {
override val index: Int = idx
......@@ -47,27 +48,21 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
if (preservesPartitioning) firstParent[Any].partitioner else None
override def getPartitions: Array[Partition] = {
val sizes = rdds.map(x => x.partitions.size)
if (!sizes.forall(x => x == sizes(0))) {
val numParts = rdds.head.partitions.size
if (!rdds.forall(rdd => rdd.partitions.size == numParts)) {
throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
}
val array = new Array[Partition](sizes(0))
for (i <- 0 until sizes(0)) {
array(i) = new ZippedPartitionsPartition(i, rdds)
Array.tabulate[Partition](numParts) { i =>
val prefs = rdds.map(rdd => rdd.preferredLocations(rdd.partitions(i)))
// Check whether there are any hosts that match all RDDs; otherwise return the union
val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y))
val locs = if (!exactMatchLocations.isEmpty) exactMatchLocations else prefs.flatten.distinct
new ZippedPartitionsPartition(i, rdds, locs)
}
array
}
override def getPreferredLocations(s: Partition): Seq[String] = {
val parts = s.asInstanceOf[ZippedPartitionsPartition].partitions
val prefs = rdds.zip(parts).map { case (rdd, p) => rdd.preferredLocations(p) }
// Check whether there are any hosts that match all RDDs; otherwise return the union
val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y))
if (!exactMatchLocations.isEmpty) {
exactMatchLocations
} else {
prefs.flatten.distinct
}
s.asInstanceOf[ZippedPartitionsPartition].preferredLocations
}
override def clearDependencies() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment