Skip to content
Snippets Groups Projects
Commit 2fead510 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge branch 'master' of github.com:tbfenet/incubator-spark

PartitionPruningRDD is using index from parent

I was getting a ArrayIndexOutOfBoundsException exception after doing union on pruned RDD. The index it was using on the partition was the index in the original RDD not the new pruned RDD.
parents 4b895013 f639b65e
No related branches found
No related tags found
No related merge requests found
......@@ -33,11 +33,13 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo
extends NarrowDependency[T](rdd) {
@transient
val partitions: Array[Partition] = rdd.partitions.zipWithIndex
.filter(s => partitionFilterFunc(s._2))
val partitions: Array[Partition] = rdd.partitions
.filter(s => partitionFilterFunc(s.index)).zipWithIndex
.map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
override def getParents(partitionId: Int) = List(partitions(partitionId).index)
override def getParents(partitionId: Int) = {
List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
}
}
......
......@@ -15,31 +15,72 @@
* limitations under the License.
*/
package org.apache.spark
package org.apache.spark.rdd
import org.scalatest.FunSuite
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.{RDD, PartitionPruningRDD}
import org.apache.spark.{TaskContext, Partition, SharedSparkContext}
class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
test("Pruned Partitions inherit locality prefs correctly") {
class TestPartition(i: Int) extends Partition {
def index = i
}
val rdd = new RDD[Int](sc, Nil) {
override protected def getPartitions = {
Array[Partition](
new TestPartition(1),
new TestPartition(2),
new TestPartition(3))
new TestPartition(0, 1),
new TestPartition(1, 1),
new TestPartition(2, 1))
}
def compute(split: Partition, context: TaskContext) = {
Iterator()
}
def compute(split: Partition, context: TaskContext) = {Iterator()}
}
val prunedRDD = PartitionPruningRDD.create(rdd, {x => if (x==2) true else false})
val p = prunedRDD.partitions(0)
assert(p.index == 2)
val prunedRDD = PartitionPruningRDD.create(rdd, {
x => if (x == 2) true else false
})
assert(prunedRDD.partitions.length == 1)
val p = prunedRDD.partitions(0)
assert(p.index == 0)
assert(p.asInstanceOf[PartitionPruningRDDPartition].parentSplit.index == 2)
}
test("Pruned Partitions can be unioned ") {
val rdd = new RDD[Int](sc, Nil) {
override protected def getPartitions = {
Array[Partition](
new TestPartition(0, 4),
new TestPartition(1, 5),
new TestPartition(2, 6))
}
def compute(split: Partition, context: TaskContext) = {
List(split.asInstanceOf[TestPartition].testValue).iterator
}
}
val prunedRDD1 = PartitionPruningRDD.create(rdd, {
x => if (x == 0) true else false
})
val prunedRDD2 = PartitionPruningRDD.create(rdd, {
x => if (x == 2) true else false
})
val merged = prunedRDD1 ++ prunedRDD2
assert(merged.count() == 2)
val take = merged.take(2)
assert(take.apply(0) == 4)
assert(take.apply(1) == 6)
}
}
class TestPartition(i: Int, value: Int) extends Partition with Serializable {
def index = i
def testValue = this.value
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment