diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 6e88be6f6ac64bb3d99bb56aa276c789dfba7c6a..7623c44d8856c4a0f414a89d264de085d43f54c7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -545,20 +545,35 @@ abstract class RDD[T: ClassManifest]( * *same number of partitions*, but does *not* require them to have the same number * of elements in each partition. */ + def zipPartitions[B: ClassManifest, V: ClassManifest] + (rdd2: RDD[B], preservesPartitioning: Boolean) + (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = + new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) + def zipPartitions[B: ClassManifest, V: ClassManifest] (rdd2: RDD[B]) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = - new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2) + new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false) + + def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest] + (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) + (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = + new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning) def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest] (rdd2: RDD[B], rdd3: RDD[C]) (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = - new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3) + new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false) + + def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest] + (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean) + (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = + new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning) def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = - new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4) + new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false) // Actions (launch a job to return a value to the user program) diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 31e6fd519d0ddad814f8da707bf7f1f23974df73..faeb316664b00e53ca9d92fdc301e97517b4452d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -39,9 +39,13 @@ private[spark] class ZippedPartitionsPartition( abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( sc: SparkContext, - var rdds: Seq[RDD[_]]) + var rdds: Seq[RDD[_]], + preservesPartitioning: Boolean = false) extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) { + override val partitioner = + if (preservesPartitioning) firstParent[Any].partitioner else None + override def getPartitions: Array[Partition] = { val sizes = rdds.map(x => x.partitions.size) if (!sizes.forall(x => x == sizes(0))) { @@ -76,8 +80,9 @@ class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest] sc: SparkContext, f: (Iterator[A], Iterator[B]) => Iterator[V], var rdd1: RDD[A], - var rdd2: RDD[B]) - extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) { + var rdd2: RDD[B], + preservesPartitioning: Boolean = false) + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions @@ -97,8 +102,9 @@ class ZippedPartitionsRDD3 f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], - var rdd3: RDD[C]) - extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) { + var rdd3: RDD[C], + preservesPartitioning: Boolean = false) + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions @@ -122,8 +128,9 @@ class ZippedPartitionsRDD4 var rdd1: RDD[A], var rdd2: RDD[B], var rdd3: RDD[C], - var rdd4: RDD[D]) - extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) { + var rdd4: RDD[D], + preservesPartitioning: Boolean = false) + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions