diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala index 2f1f907c6c746c7b6ff28844117c31a5799d83e5..5f95559f15122707987b5e64a23e1bd67eb6f3c1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala @@ -24,7 +24,6 @@ import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer - import scala.collection.mutable.BitSet @@ -72,7 +71,7 @@ class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockI */ class IndexedRDD[K: ClassManifest, V: ClassManifest]( @transient val index: RDDIndex[K], - @transient val valuesRDD: RDD[ (Seq[V], BitSet) ]) + @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ]) extends RDD[(K, V)](index.rdd.context, List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { @@ -119,13 +118,14 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( */ def mapValues[U: ClassManifest](f: V => U): IndexedRDD[K, U] = { val cleanF = index.rdd.context.clean(f) - val newValuesRDD = valuesRDD.mapPartitions(iter => iter.map{ + val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = + valuesRDD.mapPartitions(iter => iter.map{ case (values, bs) => val newValues = new Array[U](values.size) for ( ind <- bs ) { newValues(ind) = f(values(ind)) } - (newValues.toSeq, bs) + (newValues.toIndexedSeq, bs) }, preservesPartitioning = true) new IndexedRDD[K,U](index, newValuesRDD) } @@ -137,7 +137,8 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( */ def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): IndexedRDD[K, U] = { val cleanF = index.rdd.context.clean(f) - val newValues = index.rdd.zipPartitions(valuesRDD){ + val newValues: RDD[ (IndexedSeq[U], BitSet) ] = + index.rdd.zipPartitions(valuesRDD){ (keysIter, valuesIter) => val index = keysIter.next() assert(keysIter.hasNext() == false) @@ -149,7 +150,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( for( (k,i) <- index ) { if (bs(i)) { newValues(i) = f(k, oldValues(i)) } } - Array((newValues.toSeq, bs)).iterator + Array((newValues.toIndexedSeq, bs)).iterator } new IndexedRDD[K,U](index, newValues) } @@ -159,7 +160,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( if(index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } - val newValuesRDD: RDD[ (Seq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ + val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ (thisIter, otherIter) => val (thisValues, thisBS) = thisIter.next() assert(!thisIter.hasNext) @@ -167,7 +168,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( assert(!otherIter.hasNext) val newBS = thisBS & otherBS val newValues = thisValues.view.zip(otherValues) - Iterator((newValues, newBS)) + Iterator((newValues.toIndexedSeq, newBS)) } new IndexedRDD(index, newValuesRDD) } @@ -177,7 +178,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( if(index != other.index) { throw new SparkException("A zipJoin can only be applied to RDDs with the same index!") } - val newValuesRDD: RDD[ (Seq[(V,Option[W])], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ + val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){ (thisIter, otherIter) => val (thisValues, thisBS) = thisIter.next() assert(!thisIter.hasNext) @@ -186,7 +187,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( val otherOption = otherValues.view.zipWithIndex .map{ (x: (W, Int)) => if(otherBS(x._2)) Option(x._1) else None } val newValues = thisValues.view.zip(otherOption) - Iterator((newValues, thisBS)) + Iterator((newValues.toIndexedSeq, thisBS)) } new IndexedRDD(index, newValuesRDD) } @@ -197,6 +198,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( other: RDD[(K,W)], merge: (W,W) => W = (a:W, b:W) => a): IndexedRDD[K, (V, Option[W]) ] = { val cleanMerge = index.rdd.context.clean(merge) + other match { case other: IndexedRDD[_, _] if index == other.index => { leftZipJoin(other) @@ -211,7 +213,8 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( val otherShuffled = if (other.partitioner == Some(partitioner)) other else other.partitionBy(partitioner) - val newValues = index.rdd.zipPartitions(valuesRDD, other) { + val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = + index.rdd.zipPartitions(valuesRDD, other) { (thisIndexIter, thisIter, tuplesIter) => val index = thisIndexIter.next() assert(!thisIndexIter.hasNext) @@ -236,7 +239,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( .map{ (x: (W, Int)) => if(wBS(x._2)) Option(x._1) else None } val newValues = thisValues.view.zip(otherOption) - Iterator((newValues.toSeq, thisBS)) + Iterator((newValues.toIndexedSeq, thisBS)) } // end of newValues new IndexedRDD(index, newValues) } @@ -496,11 +499,12 @@ object IndexedRDD { values(ind) = reduceFunc(values(ind), v) } } - Iterator( (indexMap, (values.toSeq, bs)) ) + Iterator( (indexMap, (values.toIndexedSeq, bs)) ) }, true).cache // extract the index and the values val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true) - val values = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) + val values: RDD[(IndexedSeq[V], BitSet)] = + groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) new IndexedRDD[K,V](new RDDIndex(index), values) } @@ -580,7 +584,7 @@ object IndexedRDD { } // Use the index to build the new values table - val values: RDD[ (Seq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { + val values: RDD[ (IndexedSeq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => { // There is only one map val index = indexIter.next() assert(!indexIter.hasNext()) diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala index 0310711d371e119d2e312f3844fc776aaa71e870..fd7c16089d69e2ed41a5ebdfe8f5177ed5a7b982 100644 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala @@ -60,7 +60,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K */ override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = { val cleanF = self.index.rdd.context.clean(f) - val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{ + val newValuesRDD: RDD[(IndexedSeq[U], BitSet)] = self.valuesRDD.mapPartitions(iter => iter.map{ case (values, bs) => val newValues = new Array[U](values.size) val newBS = new BitSet(values.size) @@ -71,7 +71,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K newBS(ind) = true } } - (newValues.toSeq, newBS) + (newValues.toIndexedSeq, newBS) }, preservesPartitioning = true) new IndexedRDD[K,U](self.index, newValuesRDD) } @@ -120,7 +120,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K // then we simply merge the value RDDs. // However it is possible that both RDDs are missing a value for a given key in // which case the returned RDD should have a null value - val newValues = + val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = self.valuesRDD.zipPartitions(other.valuesRDD){ (thisIter, otherIter) => val (thisValues, thisBS) = thisIter.next() @@ -136,7 +136,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] newValues(ind) = (a, b) } - Iterator((newValues.toSeq, newBS)) + Iterator((newValues.toIndexedSeq, newBS)) } new IndexedRDD(self.index, newValues) } @@ -166,7 +166,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K List(newIndex).iterator }).cache() // Use the new index along with the this and the other indices to merge the values - val newValues = + val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = newIndex.zipPartitions(self.tuples, other.tuples)( (newIndexIter, thisTuplesIter, otherTuplesIter) => { // Get the new index for this partition @@ -199,7 +199,7 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K newBS(ind) = true } } - Iterator((newValues.toSeq, newBS)) + Iterator((newValues.toIndexedSeq, newBS)) }) new IndexedRDD(new RDDIndex(newIndex), newValues) } @@ -262,12 +262,13 @@ class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K // case null => null // case (s, ab) => Seq((s, ab.toSeq)) // }.toSeq - Iterator( (newIndex, (newValues.toSeq, newBS)) ) + Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) ) }).cache() // Extract the index and values from the above RDD val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) - val newValues = groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) + val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = + groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues) } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index ce1b9467c407d3838716d06528452de239a4a98c..413177b2dae63e69896390bd6ac3f5699347c944 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -545,7 +545,7 @@ object GraphImpl { val newValuesRDD = replicationMap.valuesRDD.zipPartitions(msgsByPartition){ (mapIter, msgsIter) => - val (Seq(vidToIndex), bs) = mapIter.next() + val (IndexedSeq(vidToIndex), bs) = mapIter.next() assert(!mapIter.hasNext) // Populate the vertex array using the vidToIndex map val vertexArray = new Array[VD](vidToIndex.size) @@ -553,7 +553,7 @@ object GraphImpl { val ind = vidToIndex(msg.data._1) vertexArray(ind) = msg.data._2 } - Iterator((Seq(vertexArray), bs)) + Iterator((IndexedSeq(vertexArray), bs)) } new IndexedRDD(replicationMap.index, newValuesRDD)