diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b3a2cb39fcdd13ed26ba2c30698ae8a546dd6a61..0aafc0a2fc7d072a3d1451eb966a78a60940da09 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -959,7 +959,7 @@ object SparkContext { // TODO: Add AccumulatorParams for other types, e.g. lists and strings implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = - rdd.pairRDDFunctions + new PairRDDFunctions(rdd) implicit def rddToAsyncRDDActions[T: ClassManifest](rdd: RDD[T]) = new AsyncRDDActions(rdd) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 2f94ae5fa8ee688d95f01b2718fe467d64a1ea73..a6518abf456d3ec9d3da730713c2d71733a6fc14 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -264,11 +264,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ - def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = fromRDD(rdd.join(other, partitioner)) - } /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the @@ -278,8 +275,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif */ def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (V, Optional[W])] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.leftOuterJoin(other, partitioner) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) } @@ -292,8 +287,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif */ def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (Optional[V], W)] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.rightOuterJoin(other, partitioner) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } @@ -332,22 +325,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = fromRDD(rdd.join(other)) - } /** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = fromRDD(rdd.join(other, numPartitions)) - } /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the @@ -356,8 +343,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * using the existing partitioner/parallelism level. */ def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.leftOuterJoin(other) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) } @@ -369,8 +354,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * into `numPartitions` partitions. */ def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.leftOuterJoin(other, numPartitions) fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) } @@ -382,8 +365,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * RDD using the existing partitioner/parallelism level. */ def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.rightOuterJoin(other) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } @@ -395,8 +376,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * RDD into the given number of partitions. */ def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] val joinResult = rdd.rightOuterJoin(other, numPartitions) fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) } @@ -433,86 +412,55 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner) - : JavaPairRDD[K, (JList[V], JList[W])] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + : JavaPairRDD[K, (JList[V], JList[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner))) - } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { - implicit val w1m: ClassManifest[W1] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] - implicit val w2m: ClassManifest[W2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner))) - } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other))) - } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { - implicit val w1m: ClassManifest[W1] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] - implicit val w2m: ClassManifest[W2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2))) - } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] - fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) - } + def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] + = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) + /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { - implicit val w1m: ClassManifest[W1] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] - implicit val w2m: ClassManifest[W2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions))) - } /** Alias for cogroup. */ - def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = { - implicit val wm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = fromRDD(cogroupResultToJava(rdd.groupWith(other))) - } /** Alias for cogroup. */ def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { - implicit val w1m: ClassManifest[W1] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W1]] - implicit val w2m: ClassManifest[W2] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W2]] + : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2))) - } /** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala deleted file mode 100644 index fd7c16089d69e2ed41a5ebdfe8f5177ed5a7b982..0000000000000000000000000000000000000000 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDDFunctions.scala +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import java.util.{HashMap => JHashMap, BitSet => JBitSet, HashSet => JHashSet} - -import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer - -import scala.collection.mutable.BitSet - -import org.apache.spark._ - - - -class IndexedRDDFunctions[K: ClassManifest, V: ClassManifest](self: IndexedRDD[K,V]) - extends PairRDDFunctions[K,V](self) { - - /** - * Construct a new IndexedRDD that is indexed by only the keys in the RDD - */ - def reindex(): IndexedRDD[K,V] = IndexedRDD(self) - - - // /** - // * Pass each value in the key-value pair RDD through a map function without changing the keys; - // * this also retains the original RDD's partitioning. - // */ - // override def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = { - // val cleanF = self.index.rdd.context.clean(f) - // val newValuesRDD = self.valuesRDD.mapPartitions(iter => iter.map{ - // case (values, bs) => - // val newValues = new Array[U](values.size) - // for ( ind <- bs ) { - // newValues(ind) = f(values(ind)) - // } - // (newValues.toSeq, bs) - // }, preservesPartitioning = true) - // new IndexedRDD[K,U](self.index, newValuesRDD) - // } - - /** - * Pass each value in the key-value pair RDD through a flatMap function without changing the - * keys; this also retains the original RDD's partitioning. - */ - override def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K,U)] = { - val cleanF = self.index.rdd.context.clean(f) - val newValuesRDD: RDD[(IndexedSeq[U], BitSet)] = self.valuesRDD.mapPartitions(iter => iter.map{ - case (values, bs) => - val newValues = new Array[U](values.size) - val newBS = new BitSet(values.size) - for ( ind <- bs ) { - val res = f(values(ind)) - if(!res.isEmpty) { - newValues(ind) = res.toIterator.next() - newBS(ind) = true - } - } - (newValues.toIndexedSeq, newBS) - }, preservesPartitioning = true) - new IndexedRDD[K,U](self.index, newValuesRDD) - } - - - /** - * Generic function to combine the elements for each key using a custom set of aggregation - * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C - * Note that V and C can be different -- for example, one might group an RDD of type - * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions: - * - * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) - * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) - * - `mergeCombiners`, to combine two C's into a single one. - */ - override def combineByKey[C: ClassManifest](createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C, - partitioner: Partitioner, - mapSideCombine: Boolean = true, - serializerClass: String = null): RDD[(K, C)] = { - mapValues(createCombiner) - } - - - // /** - // * Group the values for each key in the RDD into a single sequence. Hash-partitions the - // * resulting RDD with the existing partitioner/parallelism level. - // */ - // override def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { - // val newValues = self.valuesRDD.mapPartitions(_.map{ar => ar.map{s => Seq(s)} }, true) - // new IndexedRDD[K, Seq[V]](self.index, newValues) - // } - - - /** - * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the - * list of values for that key in `this` as well as `other`. - */ - override def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): - IndexedRDD[K, (Seq[V], Seq[W])] = { - //RDD[(K, (Seq[V], Seq[W]))] = { - other match { - case other: IndexedRDD[_, _] if self.index == other.index => { - // if both RDDs share exactly the same index and therefore the same super set of keys - // then we simply merge the value RDDs. - // However it is possible that both RDDs are missing a value for a given key in - // which case the returned RDD should have a null value - val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = - self.valuesRDD.zipPartitions(other.valuesRDD){ - (thisIter, otherIter) => - val (thisValues, thisBS) = thisIter.next() - assert(!thisIter.hasNext) - val (otherValues, otherBS) = otherIter.next() - assert(!otherIter.hasNext) - - val newValues = new Array[(Seq[V], Seq[W])](thisValues.size) - val newBS = thisBS | otherBS - - for( ind <- newBS ) { - val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] - val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] - newValues(ind) = (a, b) - } - Iterator((newValues.toIndexedSeq, newBS)) - } - new IndexedRDD(self.index, newValues) - } - case other: IndexedRDD[_, _] - if self.index.rdd.partitioner == other.index.rdd.partitioner => { - // If both RDDs are indexed using different indices but with the same partitioners - // then we we need to first merge the indicies and then use the merged index to - // merge the values. - val newIndex = - self.index.rdd.zipPartitions(other.index.rdd)( - (thisIter, otherIter) => { - val thisIndex = thisIter.next() - assert(!thisIter.hasNext) - val otherIndex = otherIter.next() - assert(!otherIter.hasNext) - val newIndex = new BlockIndex[K]() - // @todo Merge only the keys that correspond to non-null values - // Merge the keys - newIndex.putAll(thisIndex) - newIndex.putAll(otherIndex) - // We need to rekey the index - var ctr = 0 - for (e <- newIndex.entrySet) { - e.setValue(ctr) - ctr += 1 - } - List(newIndex).iterator - }).cache() - // Use the new index along with the this and the other indices to merge the values - val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = - newIndex.zipPartitions(self.tuples, other.tuples)( - (newIndexIter, thisTuplesIter, otherTuplesIter) => { - // Get the new index for this partition - val newIndex = newIndexIter.next() - assert(!newIndexIter.hasNext) - // Get the corresponding indicies and values for this and the other IndexedRDD - val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() - assert(!thisTuplesIter.hasNext) - val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next() - assert(!otherTuplesIter.hasNext) - // Preallocate the new Values array - val newValues = new Array[(Seq[V], Seq[W])](newIndex.size) - val newBS = new BitSet(newIndex.size) - - // Lookup the sequences in both submaps - for ((k,ind) <- newIndex) { - // Get the left key - val a = if (thisIndex.contains(k)) { - val ind = thisIndex.get(k) - if(thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] - } else Seq.empty[V] - // Get the right key - val b = if (otherIndex.contains(k)) { - val ind = otherIndex.get(k) - if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] - } else Seq.empty[W] - // If at least one key was present then we generate a tuple. - if (!a.isEmpty || !b.isEmpty) { - newValues(ind) = (a, b) - newBS(ind) = true - } - } - Iterator((newValues.toIndexedSeq, newBS)) - }) - new IndexedRDD(new RDDIndex(newIndex), newValues) - } - case _ => { - // Get the partitioner from the index - val partitioner = self.index.rdd.partitioner match { - case Some(p) => p - case None => throw new SparkException("An index must have a partitioner.") - } - // Shuffle the other RDD using the partitioner for this index - val otherShuffled = - if (other.partitioner == Some(partitioner)) { - other - } else { - new ShuffledRDD[K, W, (K,W)](other, partitioner) - } - // Join the other RDD with this RDD building a new valueset and new index on the fly - val groups = - self.tuples.zipPartitions(otherShuffled)( - (thisTuplesIter, otherTuplesIter) => { - // Get the corresponding indicies and values for this IndexedRDD - val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() - assert(!thisTuplesIter.hasNext()) - // Construct a new index - val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]] - // Construct a new array Buffer to store the values - val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null) - val newBS = new BitSet(thisValues.size) - // populate the newValues with the values in this IndexedRDD - for ((k,i) <- thisIndex) { - if (thisBS(i)) { - newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W]) - newBS(i) = true - } - } - // Now iterate through the other tuples updating the map - for ((k,w) <- otherTuplesIter){ - if (newIndex.contains(k)) { - val ind = newIndex.get(k) - if(newBS(ind)) { - newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) - } else { - // If the other key was in the index but not in the values - // of this indexed RDD then create a new values entry for it - newBS(ind) = true - newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) - } - } else { - // update the index - val ind = newIndex.size - newIndex.put(k, ind) - newBS(ind) = true - // Update the values - newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) - } - } - // // Finalize the new values array - // val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = - // newValues.view.map{ - // case null => null - // case (s, ab) => Seq((s, ab.toSeq)) - // }.toSeq - Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) ) - }).cache() - - // Extract the index and values from the above RDD - val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) - val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = - groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) - - new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues) - } - } - } - - -} - -//(self: IndexedRDD[K, V]) extends PairRDDFunctions(self) { } - - diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 81bf867188ac494328063fe46486617bbbcaae01..93b78e123267c2c3926e8ee69b735a31600aeeae 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -68,7 +68,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ - def combineByKey[C: ClassManifest](createCombiner: V => C, + def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, @@ -108,7 +108,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) /** * Simplified version of combineByKey that hash-partitions the output RDD. */ - def combineByKey[C: ClassManifest](createCombiner: V => C, + def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] = { @@ -253,7 +253,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ - def join[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { + def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (v, w) } @@ -265,7 +265,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to * partition the output RDD. */ - def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (ws.isEmpty) { vs.iterator.map(v => (v, None)) @@ -281,7 +281,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to * partition the output RDD. */ - def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner) + def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (vs.isEmpty) { @@ -296,7 +296,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. */ - def combineByKey[C: ClassManifest](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) + def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) } @@ -324,7 +324,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, W))] = { + def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { join(other, defaultPartitioner(self, other)) } @@ -333,7 +333,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { + def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { join(other, new HashPartitioner(numPartitions)) } @@ -343,7 +343,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * using the existing partitioner/parallelism level. */ - def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, defaultPartitioner(self, other)) } @@ -353,7 +353,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * into `numPartitions` partitions. */ - def leftOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -363,7 +363,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD using the existing partitioner/parallelism level. */ - def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { + def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, defaultPartitioner(self, other)) } @@ -373,7 +373,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD into the given number of partitions. */ - def rightOuterJoin[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { + def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -392,25 +392,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * Pass each value in the key-value pair RDD through a map function without changing the keys; * this also retains the original RDD's partitioning. */ - def mapValues[U: ClassManifest](f: V => U): RDD[(K, U)] = { + def mapValues[U](f: V => U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MappedValuesRDD(self, cleanF) } - - /** - * Pass each value in the key-value pair RDD through a map function without changing the keys; - * this also retains the original RDD's partitioning. - */ - def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): RDD[(K, U)] = { - self.map{ case (k,v) => (k, f(k,v)) } - } - /** * Pass each value in the key-value pair RDD through a flatMap function without changing the * keys; this also retains the original RDD's partitioning. */ - def flatMapValues[U: ClassManifest](f: V => TraversableOnce[U]): RDD[(K, U)] = { + def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { val cleanF = self.context.clean(f) new FlatMappedValuesRDD(self, cleanF) } @@ -419,7 +410,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -434,7 +425,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") @@ -450,7 +441,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(self, other)) } @@ -458,7 +449,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } @@ -467,7 +458,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { cogroup(other, new HashPartitioner(numPartitions)) } @@ -475,18 +466,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { cogroup(other1, other2, new HashPartitioner(numPartitions)) } /** Alias for cogroup. */ - def groupWith[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(self, other)) } /** Alias for cogroup. */ - def groupWith[W1: ClassManifest, W2: ClassManifest](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } @@ -707,20 +698,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) */ def values: RDD[V] = self.map(_._2) - - - def indexed(): IndexedRDD[K,V] = IndexedRDD(self) - - def indexed(numPartitions: Int): IndexedRDD[K,V] = - IndexedRDD(self.partitionBy(new HashPartitioner(numPartitions))) - - def indexed(partitioner: Partitioner): IndexedRDD[K,V] = - IndexedRDD(self.partitionBy(partitioner)) - - def indexed(existingIndex: RDDIndex[K]): IndexedRDD[K,V] = - IndexedRDD(self, existingIndex) - - private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d14b4c60c7323c66fd25da3e0cdfa4a4763182fe..0355618e435bd3d00357406ca167ab324aae4645 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -805,26 +805,6 @@ abstract class RDD[T: ClassManifest]( return buf.toArray } - - - /** - * For RDD[(K,V)] this function returns a pair-functions object for this RDD - */ - def pairRDDFunctions[K, V]( - implicit t: T <:< (K, V), k: ClassManifest[K], v: ClassManifest[V]): - PairRDDFunctions[K, V] = { - new PairRDDFunctions(this.asInstanceOf[RDD[(K,V)]]) - } - - - /** - * Construct an index over the unique elements in this RDD. The - * index can then be used to organize a RDD[(T,V)]. - */ - def makeIndex(partitioner: Option[Partitioner] = None): RDDIndex[T] = - IndexedRDD.makeIndex(this, partitioner) - - /** * Return the first element in this RDD. */ diff --git a/core/src/test/scala/org/apache/spark/IndexedRDDSuite.scala b/core/src/test/scala/org/apache/spark/IndexedRDDSuite.scala deleted file mode 100644 index dadb183bdca09deb5273d738c1f32f1085b827ef..0000000000000000000000000000000000000000 --- a/core/src/test/scala/org/apache/spark/IndexedRDDSuite.scala +++ /dev/null @@ -1,461 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - - -import org.scalatest.FunSuite -import org.scalatest.prop.Checkers -import org.scalacheck.Arbitrary._ -import org.scalacheck.Gen -import org.scalacheck.Prop._ - -import com.google.common.io.Files - -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashSet - -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.ShuffledRDD -import org.apache.spark.rdd.IndexedRDD - -import org.apache.spark.SparkContext._ -import org.apache.spark._ - - - -class IndexedRDDSuite extends FunSuite with SharedSparkContext { - - def lineage(rdd: RDD[_]): collection.mutable.HashSet[RDD[_]] = { - val set = new collection.mutable.HashSet[RDD[_]] - def visit(rdd: RDD[_]) { - for (dep <- rdd.dependencies) { - set += dep.rdd - visit(dep.rdd) - } - } - visit(rdd) - set - } - - test("groupByKey") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed() - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("groupByKey with duplicates") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("groupByKey with negative key hash codes") { - val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))).indexed() - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesForMinus1 = groups.find(_._1 == -1).get._2 - assert(valuesForMinus1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("groupByKey with many output partitions") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed(10) - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("reduceByKey") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() - val sums = pairs.reduceByKey(_+_).collect() - assert(sums.toSet === Set((1, 7), (2, 1))) - } - - test("reduceByKey with collectAsMap") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() - val sums = pairs.reduceByKey(_+_).collectAsMap() - assert(sums.size === 2) - assert(sums(1) === 7) - assert(sums(2) === 1) - } - - test("reduceByKey with many output partitons") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(10) - val sums = pairs.reduceByKey(_+_).collect() - assert(sums.toSet === Set((1, 7), (2, 1))) - } - - test("reduceByKey with partitioner") { - val p = new Partitioner() { - def numPartitions = 2 - def getPartition(key: Any) = key.asInstanceOf[Int] - } - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).indexed(p) - val sums = pairs.reduceByKey(_+_) - assert(sums.collect().toSet === Set((1, 4), (0, 1))) - assert(sums.partitioner === Some(p)) - // count the dependencies to make sure there is only 1 ShuffledRDD - val deps = lineage(sums) - - assert(deps.filter(_.isInstanceOf[ShuffledRDD[_,_,_]]).size === 1) // ShuffledRDD, ParallelCollection - } - - - - test("joinIndexVsPair") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')) - )) - } - - test("joinIndexVsIndex") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')) - )) - } - - test("joinSharedIndex") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4,-4), (4, 4) )).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 6) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')), - (4, (-4, 'w')), - (4, (4, 'w')) - )) - } - - - test("join all-to-all") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))).indexed(rdd1.index) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 6) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (1, 'y')), - (1, (2, 'x')), - (1, (2, 'y')), - (1, (3, 'x')), - (1, (3, 'y')) - )) - } - - test("leftOuterJoinIndex") { - val index = sc.parallelize( 1 to 6 ).makeIndex() - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.leftOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (1, Some('x'))), - (1, (2, Some('x'))), - (2, (1, Some('y'))), - (2, (1, Some('z'))), - (3, (1, None)) - )) - } - - test("leftOuterJoinIndextoIndex") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() - val joined = rdd1.leftOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (1, Some('x'))), - (1, (2, Some('x'))), - (2, (1, Some('y'))), - (2, (1, Some('z'))), - (3, (1, None)) - )) - } - - test("leftOuterJoinIndextoSharedIndex") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4, -4))).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index) - val joined = rdd1.leftOuterJoin(rdd2).collect() - assert(joined.size === 6) - assert(joined.toSet === Set( - (1, (1, Some('x'))), - (1, (2, Some('x'))), - (2, (1, Some('y'))), - (2, (1, Some('z'))), - (4, (-4, Some('w'))), - (3, (1, None)) - )) - } - -test("leftOuterJoinIndextoIndexExternal") { - val index = sc.parallelize( 1 to 6 ).makeIndex() - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) - val joined = rdd1.leftOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (1, Some('x'))), - (1, (2, Some('x'))), - (2, (1, Some('y'))), - (2, (1, Some('z'))), - (3, (1, None)) - )) - } - - - test("rightOuterJoin") { - val index = sc.parallelize( 1 to 6 ).makeIndex() - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.rightOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (Some(1), 'x')), - (1, (Some(2), 'x')), - (2, (Some(1), 'y')), - (2, (Some(1), 'z')), - (4, (None, 'w')) - )) - } - - test("rightOuterJoinIndex2Index") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() - val joined = rdd1.rightOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (Some(1), 'x')), - (1, (Some(2), 'x')), - (2, (Some(1), 'y')), - (2, (Some(1), 'z')), - (4, (None, 'w')) - )) - } - - - test("rightOuterJoinIndex2Indexshared") { - val index = sc.parallelize( 1 to 6 ).makeIndex() - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) - val joined = rdd1.rightOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (Some(1), 'x')), - (1, (Some(2), 'x')), - (2, (Some(1), 'y')), - (2, (Some(1), 'z')), - (4, (None, 'w')) - )) - } - - - test("join with no matches index") { - val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 0) - } - - test("join with no matches shared index") { - val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))).indexed(index) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 0) - } - - - test("join with many output partitions") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')) - )) - } - - test("join with many output partitions and two indices") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(20) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')) - )) - } - - - test("groupWith") { - val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) - - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) - val joined = rdd1.groupWith(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))), - (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))), - (3, (ArrayBuffer(1), ArrayBuffer())), - (4, (ArrayBuffer(), ArrayBuffer('w'))) - )) - } - - test("zero-partition RDD") { - val emptyDir = Files.createTempDir() - val file = sc.textFile(emptyDir.getAbsolutePath) - assert(file.partitions.size == 0) - assert(file.collect().toList === Nil) - // Test that a shuffle on the file works, because this used to be a bug - assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) - } - - test("keys and values") { - val rdd = sc.parallelize(Array((1, "a"), (2, "b"))).indexed() - assert(rdd.keys.collect().toList === List(1, 2)) - assert(rdd.values.collect().toList === List("a", "b")) - } - - test("default partitioner uses partition size") { - // specify 2000 partitions - val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) - // do a map, which loses the partitioner - val b = a.map(a => (a, (a * 2).toString)) - // then a group by, and see we didn't revert to 2 partitions - val c = b.groupByKey() - assert(c.partitions.size === 2000) - } - - // test("default partitioner uses largest partitioner indexed to indexed") { - // val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2).indexed() - // val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000).indexed() - // val c = a.join(b) - // assert(c.partitions.size === 2000) - // } - - - - test("subtract") { - val a = sc.parallelize(Array(1, 2, 3), 2) - val b = sc.parallelize(Array(2, 3, 4), 4) - val c = a.subtract(b) - assert(c.collect().toSet === Set(1)) - assert(c.partitions.size === a.partitions.size) - } - - test("subtract with narrow dependency") { - // use a deterministic partitioner - val p = new Partitioner() { - def numPartitions = 5 - def getPartition(key: Any) = key.asInstanceOf[Int] - } - // partitionBy so we have a narrow dependency - val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).indexed(p) - // more partitions/no partitioner so a shuffle dependency - val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) - val c = a.subtract(b) - assert(c.collect().toSet === Set((1, "a"), (3, "c"))) - // Ideally we could keep the original partitioner... - assert(c.partitioner === None) - } - - test("subtractByKey") { - - val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2).indexed() - val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) - val c = a.subtractByKey(b) - assert(c.collect().toSet === Set((1, "a"), (1, "a"))) - assert(c.partitions.size === a.partitions.size) - } - - // test("subtractByKey with narrow dependency") { - // // use a deterministic partitioner - // val p = new Partitioner() { - // def numPartitions = 5 - // def getPartition(key: Any) = key.asInstanceOf[Int] - // } - - // val index = sc.parallelize( 1 to 6 ).makeIndex(Some(p)) - // // partitionBy so we have a narrow dependency - // val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).indexed(index) - // // more partitions/no partitioner so a shuffle dependency - // val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4).indexed(index) - // val c = a.subtractByKey(b) - // assert(c.collect().toSet === Set((1, "a"), (1, "a"))) - // assert(c.partitioner.get === p) - // } - - test("foldByKey") { - val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index) - val sums = pairs.foldByKey(0)(_+_).collect() - assert(sums.toSet === Set((1, 7), (2, 1))) - } - - test("foldByKey with mutable result type") { - val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) - - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index) - val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache() - // Fold the values using in-place mutation - val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect() - assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1)))) - // Check that the mutable objects in the original RDD were not changed - assert(bufs.collect().toSet === Set( - (1, ArrayBuffer(1)), - (1, ArrayBuffer(2)), - (1, ArrayBuffer(3)), - (1, ArrayBuffer(1)), - (2, ArrayBuffer(1)))) - } -} diff --git a/core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala deleted file mode 100644 index 3a2ce4e4da4c6510d6377dc3658d7d08450391c3..0000000000000000000000000000000000000000 --- a/core/src/test/scala/org/apache/spark/rdd/IndexedRDDSuite.scala +++ /dev/null @@ -1,461 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - - -import org.scalatest.FunSuite -import org.scalatest.prop.Checkers -import org.scalacheck.Arbitrary._ -import org.scalacheck.Gen -import org.scalacheck.Prop._ - -import com.google.common.io.Files - -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashSet - -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.ShuffledRDD -import org.apache.spark.rdd.IndexedRDD - -import org.apache.spark.SparkContext._ -import org.apache.spark._ - - - -class IndexedRDDSuite extends FunSuite with SharedSparkContext { - - def lineage(rdd: RDD[_]): collection.mutable.HashSet[RDD[_]] = { - val set = new collection.mutable.HashSet[RDD[_]] - def visit(rdd: RDD[_]) { - for (dep <- rdd.dependencies) { - set += dep.rdd - visit(dep.rdd) - } - } - visit(rdd) - set - } - - test("groupByKey") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed() - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("groupByKey with duplicates") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("groupByKey with negative key hash codes") { - val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))).indexed() - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesForMinus1 = groups.find(_._1 == -1).get._2 - assert(valuesForMinus1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("groupByKey with many output partitions") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))).indexed(10) - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("reduceByKey") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() - val sums = pairs.reduceByKey(_+_).collect() - assert(sums.toSet === Set((1, 7), (2, 1))) - } - - test("reduceByKey with collectAsMap") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed() - val sums = pairs.reduceByKey(_+_).collectAsMap() - assert(sums.size === 2) - assert(sums(1) === 7) - assert(sums(2) === 1) - } - - test("reduceByKey with many output partitons") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(10) - val sums = pairs.reduceByKey(_+_).collect() - assert(sums.toSet === Set((1, 7), (2, 1))) - } - - test("reduceByKey with partitioner") { - val p = new Partitioner() { - def numPartitions = 2 - def getPartition(key: Any) = key.asInstanceOf[Int] - } - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).indexed(p) - val sums = pairs.reduceByKey(_+_) - assert(sums.collect().toSet === Set((1, 4), (0, 1))) - assert(sums.partitioner === Some(p)) - // count the dependencies to make sure there is only 1 ShuffledRDD - val deps = lineage(sums) - - assert(deps.filter(_.isInstanceOf[ShuffledRDD[_,_,_]]).size === 1) // ShuffledRDD, ParallelCollection - } - - - - test("joinIndexVsPair") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')) - )) - } - - test("joinIndexVsIndex") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')) - )) - } - - test("joinSharedIndex") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4,-4), (4, 4) )).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 6) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')), - (4, (-4, 'w')), - (4, (4, 'w')) - )) - } - - - test("join all-to-all") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))).indexed(rdd1.index) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 6) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (1, 'y')), - (1, (2, 'x')), - (1, (2, 'y')), - (1, (3, 'x')), - (1, (3, 'y')) - )) - } - - test("leftOuterJoinIndex") { - val index = sc.parallelize( 1 to 6 ).makeIndex() - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.leftOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (1, Some('x'))), - (1, (2, Some('x'))), - (2, (1, Some('y'))), - (2, (1, Some('z'))), - (3, (1, None)) - )) - } - - test("leftOuterJoinIndextoIndex") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() - val joined = rdd1.leftOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (1, Some('x'))), - (1, (2, Some('x'))), - (2, (1, Some('y'))), - (2, (1, Some('z'))), - (3, (1, None)) - )) - } - - test("leftOuterJoinIndextoSharedIndex") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1), (4, -4))).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(rdd1.index) - val joined = rdd1.leftOuterJoin(rdd2).collect() - assert(joined.size === 6) - assert(joined.toSet === Set( - (1, (1, Some('x'))), - (1, (2, Some('x'))), - (2, (1, Some('y'))), - (2, (1, Some('z'))), - (4, (-4, Some('w'))), - (3, (1, None)) - )) - } - -test("leftOuterJoinIndextoIndexExternal") { - val index = sc.parallelize( 1 to 6 ).makeIndex() - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) - val joined = rdd1.leftOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (1, Some('x'))), - (1, (2, Some('x'))), - (2, (1, Some('y'))), - (2, (1, Some('z'))), - (3, (1, None)) - )) - } - - - test("rightOuterJoin") { - val index = sc.parallelize( 1 to 6 ).makeIndex() - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.rightOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (Some(1), 'x')), - (1, (Some(2), 'x')), - (2, (Some(1), 'y')), - (2, (Some(1), 'z')), - (4, (None, 'w')) - )) - } - - test("rightOuterJoinIndex2Index") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed() - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed() - val joined = rdd1.rightOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (Some(1), 'x')), - (1, (Some(2), 'x')), - (2, (Some(1), 'y')), - (2, (Some(1), 'z')), - (4, (None, 'w')) - )) - } - - - test("rightOuterJoinIndex2Indexshared") { - val index = sc.parallelize( 1 to 6 ).makeIndex() - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) - val joined = rdd1.rightOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (Some(1), 'x')), - (1, (Some(2), 'x')), - (2, (Some(1), 'y')), - (2, (Some(1), 'z')), - (4, (None, 'w')) - )) - } - - - test("join with no matches index") { - val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 0) - } - - test("join with no matches shared index") { - val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))).indexed(index) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 0) - } - - - test("join with many output partitions") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')) - )) - } - - test("join with many output partitions and two indices") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(10) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(20) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')) - )) - } - - - test("groupWith") { - val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) - - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))).indexed(index) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))).indexed(index) - val joined = rdd1.groupWith(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))), - (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))), - (3, (ArrayBuffer(1), ArrayBuffer())), - (4, (ArrayBuffer(), ArrayBuffer('w'))) - )) - } - - test("zero-partition RDD") { - val emptyDir = Files.createTempDir() - val file = sc.textFile(emptyDir.getAbsolutePath) - assert(file.partitions.size == 0) - assert(file.collect().toList === Nil) - // Test that a shuffle on the file works, because this used to be a bug - assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) - } - - test("keys and values") { - val rdd = sc.parallelize(Array((1, "a"), (2, "b"))).indexed() - assert(rdd.keys.collect().toList === List(1, 2)) - assert(rdd.values.collect().toList === List("a", "b")) - } - - test("default partitioner uses partition size") { - // specify 2000 partitions - val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) - // do a map, which loses the partitioner - val b = a.map(a => (a, (a * 2).toString)) - // then a group by, and see we didn't revert to 2 partitions - val c = b.groupByKey() - assert(c.partitions.size === 2000) - } - - // test("default partitioner uses largest partitioner indexed to indexed") { - // val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2).indexed() - // val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000).indexed() - // val c = a.join(b) - // assert(c.partitions.size === 2000) - // } - - - - test("subtract") { - val a = sc.parallelize(Array(1, 2, 3), 2) - val b = sc.parallelize(Array(2, 3, 4), 4) - val c = a.subtract(b) - assert(c.collect().toSet === Set(1)) - assert(c.partitions.size === a.partitions.size) - } - - test("subtract with narrow dependency") { - // use a deterministic partitioner - val p = new Partitioner() { - def numPartitions = 5 - def getPartition(key: Any) = key.asInstanceOf[Int] - } - // partitionBy so we have a narrow dependency - val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).indexed(p) - // more partitions/no partitioner so a shuffle dependency - val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) - val c = a.subtract(b) - assert(c.collect().toSet === Set((1, "a"), (3, "c"))) - // Ideally we could keep the original partitioner... - assert(c.partitioner === None) - } - - test("subtractByKey") { - - val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2).indexed() - val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) - val c = a.subtractByKey(b) - assert(c.collect().toSet === Set((1, "a"), (1, "a"))) - assert(c.partitions.size === a.partitions.size) - } - - // test("subtractByKey with narrow dependency") { - // // use a deterministic partitioner - // val p = new Partitioner() { - // def numPartitions = 5 - // def getPartition(key: Any) = key.asInstanceOf[Int] - // } - - // val index = sc.parallelize( 1 to 6 ).makeIndex(Some(p)) - // // partitionBy so we have a narrow dependency - // val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).indexed(index) - // // more partitions/no partitioner so a shuffle dependency - // val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4).indexed(index) - // val c = a.subtractByKey(b) - // assert(c.collect().toSet === Set((1, "a"), (1, "a"))) - // assert(c.partitioner.get === p) - // } - - test("foldByKey") { - val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index) - val sums = pairs.foldByKey(0)(_+_).collect() - assert(sums.toSet === Set((1, 7), (2, 1))) - } - - test("foldByKey with mutable result type") { - val index = IndexedRDD.makeIndex( sc.parallelize( 1 to 6 ) ) - - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))).indexed(index) - val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache() - // Fold the values using in-place mutation - val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect() - assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1)))) - // Check that the mutable objects in the original RDD were not changed - assert(bufs.collect().toSet === Set( - (1, ArrayBuffer(1)), - (1, ArrayBuffer(2)), - (1, ArrayBuffer(3)), - (1, ArrayBuffer(1)), - (2, ArrayBuffer(1)))) - } -} diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 342151173a63674a3ae294ac9768f0e1bc3d207d..68f0394dd4bfa1e154f915b15dd5abfd78ea0070 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -306,6 +306,9 @@ object Graph { import org.apache.spark.graph.impl._ import org.apache.spark.SparkContext._ + def apply(rawEdges: RDD[(Vid, Vid)]): Graph[Int, Int] = { Graph(rawEdges, true) } + + /** * Construct a graph from a list of Edges. * @@ -316,7 +319,7 @@ object Graph { * * */ - def apply(rawEdges: RDD[(Vid, Vid)], uniqueEdges: Boolean = true): Graph[Int, Int] = { + def apply(rawEdges: RDD[(Vid, Vid)], uniqueEdges: Boolean): Graph[Int, Int] = { // Reduce to unique edges. val edges: RDD[Edge[Int]] = if (uniqueEdges) { @@ -330,13 +333,29 @@ object Graph { edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) }.reduceByKey(_ + _) // Return graph - GraphImpl(vertices, edges) + GraphImpl(vertices, edges, 0) } + def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[(Vid,VD)], edges: RDD[Edge[ED]]): Graph[VD, ED] = { - GraphImpl(vertices, edges) + vertices: RDD[(Vid,VD)], + edges: RDD[Edge[ED]]): Graph[VD, ED] = { + val defaultAttr: VD = null.asInstanceOf[VD] + Graph(vertices, edges, defaultAttr, (a:VD,b:VD) => a) + } + + + + /** + * Construct a new graph from a set of edges and vertices + */ + def apply[VD: ClassManifest, ED: ClassManifest]( + vertices: RDD[(Vid,VD)], + edges: RDD[Edge[ED]], + defaultVertexAttr: VD, + mergeFunc: (VD, VD) => VD): Graph[VD, ED] = { + GraphImpl(vertices, edges, defaultVertexAttr, mergeFunc) } implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops -} +} // end of Graph object diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index 76f69edf0e22fb60a75ea79924b4086de195dda2..2295084024f9be7a91d100a7b6201b17e2c1fe53 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -56,6 +56,6 @@ object GraphLoader { val vertices = edges.flatMap { edge => List((edge.srcId, 1), (edge.dstId, 1)) } .reduceByKey(_ + _) .map{ case (vid, degree) => (vid, degree) } - GraphImpl(vertices, edges) + GraphImpl(vertices, edges, 0) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala b/graph/src/main/scala/org/apache/spark/graph/IndexedRDD.scala similarity index 73% rename from core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala rename to graph/src/main/scala/org/apache/spark/graph/IndexedRDD.scala index 5f95559f15122707987b5e64a23e1bd67eb6f3c1..900a46bb4236205703dded6d8a931d917c595fdd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/IndexedRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/IndexedRDD.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.rdd +package org.apache.spark.graph import java.nio.ByteBuffer @@ -76,6 +76,12 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) { + /** + * Construct a new IndexedRDD that is indexed by only the keys in the RDD + */ + def reindex(): IndexedRDD[K,V] = IndexedRDD(this) + + /** * An internal representation which joins the block indices with the values */ @@ -247,6 +253,173 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( } +/** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): + IndexedRDD[K, (Seq[V], Seq[W])] = { + //RDD[(K, (Seq[V], Seq[W]))] = { + other match { + case other: IndexedRDD[_, _] if index == other.index => { + // if both RDDs share exactly the same index and therefore the same super set of keys + // then we simply merge the value RDDs. + // However it is possible that both RDDs are missing a value for a given key in + // which case the returned RDD should have a null value + val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = + valuesRDD.zipPartitions(other.valuesRDD){ + (thisIter, otherIter) => + val (thisValues, thisBS) = thisIter.next() + assert(!thisIter.hasNext) + val (otherValues, otherBS) = otherIter.next() + assert(!otherIter.hasNext) + + val newValues = new Array[(Seq[V], Seq[W])](thisValues.size) + val newBS = thisBS | otherBS + + for( ind <- newBS ) { + val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] + val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] + newValues(ind) = (a, b) + } + Iterator((newValues.toIndexedSeq, newBS)) + } + new IndexedRDD(index, newValues) + } + case other: IndexedRDD[_, _] + if index.rdd.partitioner == other.index.rdd.partitioner => { + // If both RDDs are indexed using different indices but with the same partitioners + // then we we need to first merge the indicies and then use the merged index to + // merge the values. + val newIndex = + index.rdd.zipPartitions(other.index.rdd)( + (thisIter, otherIter) => { + val thisIndex = thisIter.next() + assert(!thisIter.hasNext) + val otherIndex = otherIter.next() + assert(!otherIter.hasNext) + val newIndex = new BlockIndex[K]() + // @todo Merge only the keys that correspond to non-null values + // Merge the keys + newIndex.putAll(thisIndex) + newIndex.putAll(otherIndex) + // We need to rekey the index + var ctr = 0 + for (e <- newIndex.entrySet) { + e.setValue(ctr) + ctr += 1 + } + List(newIndex).iterator + }).cache() + // Use the new index along with the this and the other indices to merge the values + val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = + newIndex.zipPartitions(tuples, other.tuples)( + (newIndexIter, thisTuplesIter, otherTuplesIter) => { + // Get the new index for this partition + val newIndex = newIndexIter.next() + assert(!newIndexIter.hasNext) + // Get the corresponding indicies and values for this and the other IndexedRDD + val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() + assert(!thisTuplesIter.hasNext) + val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next() + assert(!otherTuplesIter.hasNext) + // Preallocate the new Values array + val newValues = new Array[(Seq[V], Seq[W])](newIndex.size) + val newBS = new BitSet(newIndex.size) + + // Lookup the sequences in both submaps + for ((k,ind) <- newIndex) { + // Get the left key + val a = if (thisIndex.contains(k)) { + val ind = thisIndex.get(k) + if(thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V] + } else Seq.empty[V] + // Get the right key + val b = if (otherIndex.contains(k)) { + val ind = otherIndex.get(k) + if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W] + } else Seq.empty[W] + // If at least one key was present then we generate a tuple. + if (!a.isEmpty || !b.isEmpty) { + newValues(ind) = (a, b) + newBS(ind) = true + } + } + Iterator((newValues.toIndexedSeq, newBS)) + }) + new IndexedRDD(new RDDIndex(newIndex), newValues) + } + case _ => { + // Get the partitioner from the index + val partitioner = index.rdd.partitioner match { + case Some(p) => p + case None => throw new SparkException("An index must have a partitioner.") + } + // Shuffle the other RDD using the partitioner for this index + val otherShuffled = + if (other.partitioner == Some(partitioner)) { + other + } else { + new ShuffledRDD[K, W, (K,W)](other, partitioner) + } + // Join the other RDD with this RDD building a new valueset and new index on the fly + val groups = tuples.zipPartitions(otherShuffled)( + (thisTuplesIter, otherTuplesIter) => { + // Get the corresponding indicies and values for this IndexedRDD + val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next() + assert(!thisTuplesIter.hasNext()) + // Construct a new index + val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]] + // Construct a new array Buffer to store the values + val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null) + val newBS = new BitSet(thisValues.size) + // populate the newValues with the values in this IndexedRDD + for ((k,i) <- thisIndex) { + if (thisBS(i)) { + newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W]) + newBS(i) = true + } + } + // Now iterate through the other tuples updating the map + for ((k,w) <- otherTuplesIter){ + if (newIndex.contains(k)) { + val ind = newIndex.get(k) + if(newBS(ind)) { + newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w) + } else { + // If the other key was in the index but not in the values + // of this indexed RDD then create a new values entry for it + newBS(ind) = true + newValues(ind) = (Seq.empty[V], ArrayBuffer(w)) + } + } else { + // update the index + val ind = newIndex.size + newIndex.put(k, ind) + newBS(ind) = true + // Update the values + newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) + } + } + // // Finalize the new values array + // val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = + // newValues.view.map{ + // case null => null + // case (s, ab) => Seq((s, ab.toSeq)) + // }.toSeq + Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) ) + }).cache() + + // Extract the index and values from the above RDD + val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true) + val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = + groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true) + + new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues) + } + } + } + // // def zipJoinToRDD[W: ClassManifest](other: IndexedRDD[K,W]): RDD[(K,(V,W))] = { @@ -419,15 +592,6 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest]( */ - /** - * The IndexedRDD has its own optimized version of the pairRDDFunctions. - */ - override def pairRDDFunctions[K1, V1]( - implicit t: (K, V) <:< (K1,V1), k: ClassManifest[K1], v: ClassManifest[V1]): - PairRDDFunctions[K1, V1] = { - new IndexedRDDFunctions[K1,V1](this.asInstanceOf[IndexedRDD[K1,V1]]) - } - override def filter(f: Tuple2[K,V] => Boolean): RDD[(K,V)] = { val cleanF = index.rdd.context.clean(f) diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index e7a708e895b283d2d630b85cdf170b7b1e68b7e7..c6875f0c9c470d3649a892f95ce6f024720d1350 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -15,8 +15,6 @@ import org.apache.spark.util.ClosureCleaner import org.apache.spark.rdd import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.IndexedRDD -import org.apache.spark.rdd.RDDIndex import org.apache.spark.graph._ @@ -239,7 +237,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // .map(v => (v.id, v.data._1)).indexed() // Reuse the partitioner (but not the index) from this graph - val newVTable = vertices.filter(v => vpred(v._1, v._2)).indexed(vTable.index.partitioner) + val newVTable = + IndexedRDD(vertices.filter(v => vpred(v._1, v._2)).partitionBy(vTable.index.partitioner)) // Restrict the set of edges to those that satisfy the vertex and the edge predicate. @@ -247,9 +246,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( triplets.filter( t => vpred( t.srcId, t.srcAttr ) && vpred( t.dstId, t.dstAttr ) && epred(t) ) - .map( t => Edge(t.srcId, t.dstId, t.attr) ), - eTable.index.partitioner.numPartitions - ) + .map( t => Edge(t.srcId, t.dstId, t.attr) )) // Construct the Vid2Pid map. Here we assume that the filter operation // behaves deterministically. @@ -277,11 +274,11 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( .toList .toIterator .map { case ((src, dst), data) => Edge(src, dst, data) } + .toIterator } //TODO(crankshaw) eliminate the need to call createETable - val newETable = createETable(newEdges, - eTable.index.partitioner.numPartitions) + val newETable = createETable(newEdges) new GraphImpl(vTable, vid2pid, localVidMap, newETable) } @@ -298,8 +295,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( .map { case ((src, dst), data) => Edge(src, dst, data) } } // TODO(crankshaw) eliminate the need to call createETable - val newETable = createETable(newEdges, - eTable.index.partitioner.numPartitions) + val newETable = createETable(newEdges) new GraphImpl(vTable, vid2pid, localVidMap, newETable) } @@ -393,20 +389,33 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( object GraphImpl { def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]]): + vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]], + defaultVertexAttr: VD): GraphImpl[VD,ED] = { - - apply(vertices, edges, - vertices.context.defaultParallelism, edges.context.defaultParallelism) + apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a) } def apply[VD: ClassManifest, ED: ClassManifest]( - vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]], - numVPart: Int, numEPart: Int): GraphImpl[VD,ED] = { - - val vtable = vertices.indexed(numVPart) - val etable = createETable(edges, numEPart) + vertices: RDD[(Vid, VD)], + edges: RDD[Edge[ED]], + defaultVertexAttr: VD, + mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = { + + val vtable = IndexedRDD(vertices, mergeFunc) + /** + * @todo Verify that there are no edges that contain vertices + * that are not in vTable. This should probably be resolved: + * + * edges.flatMap{ e => Array((e.srcId, null), (e.dstId, null)) } + * .cogroup(vertices).map{ + * case (vid, _, attr) => + * if (attr.isEmpty) (vid, defaultValue) + * else (vid, attr) + * } + * + */ + val etable = createETable(edges) val vid2pid = createVid2Pid(etable, vtable.index) val localVidMap = createLocalVidMap(etable) new GraphImpl(vtable, vid2pid, localVidMap, etable) @@ -422,12 +431,12 @@ object GraphImpl { * key-value pair: the key is the partition id, and the value is an EdgePartition object * containing all the edges in a partition. */ - protected def createETable[ED: ClassManifest]( - edges: RDD[Edge[ED]], numPartitions: Int) + protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]]) : IndexedRDD[Pid, EdgePartition[ED]] = { + // Get the number of partitions + val numPartitions = edges.partitions.size val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt - edges - .map { e => + IndexedRDD(edges.map { e => // Random partitioning based on the source vertex id. // val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions) // val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) @@ -446,7 +455,7 @@ object GraphImpl { } val edgePartition = builder.toEdgePartition Iterator((pid, edgePartition)) - }, preservesPartitioning = true).indexed() + }, preservesPartitioning = true)) } diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala index 061cce99b6e3d70f9b5e13668f102c60a095d4ed..895c65c14c90339df68b902d7a08a15edd493e32 100644 --- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala +++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala @@ -79,7 +79,7 @@ object GraphGenerators { v => generateRandomEdges(v._1.toInt, v._2, numVertices) } - GraphImpl(vertices, edges) + GraphImpl(vertices, edges, 0) //println("Vertices:") //for (v <- vertices) { // println(v.id) @@ -160,7 +160,7 @@ object GraphGenerators { val vertices = edges.flatMap { edge => List((edge.srcId, 1)) } .reduceByKey(_ + _) .map{ case (vid, degree) => (vid, degree) } - GraphImpl(vertices, edges) + GraphImpl(vertices, edges, 0) } /**