From e7d37472b83b3bc8e232e790b2df230e35c0a5af Mon Sep 17 00:00:00 2001
From: "Joseph E. Gonzalez" <joseph.e.gonzalez@gmail.com>
Date: Thu, 31 Oct 2013 21:09:39 -0700
Subject: [PATCH] After some testing I realized that the IndexedSeq is still
 instantiating the array (not maintaining a view) so I have replaced all
 IndexedSeq[V] with (Int => V)

---
 .../org/apache/spark/graph/VertexSetRDD.scala | 83 +++++++------------
 1 file changed, 32 insertions(+), 51 deletions(-)

diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
index 7211ff3705..f26e286003 100644
--- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
@@ -62,7 +62,6 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
 } // end of VertexSetIndex
 
 
-
 /**
  * An VertexSetRDD[V] extends the RDD[(Vid,V)] by ensuring that there
  * is only one entry for each vertex and by pre-indexing the entries
@@ -99,7 +98,7 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
  */
 class VertexSetRDD[@specialized V: ClassManifest](
     @transient val index:  VertexSetIndex,
-    @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ])
+    @transient val valuesRDD: RDD[ ( (Int => V), BitSet) ])
   extends RDD[(Vid, V)](index.rdd.context, 
     List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
 
@@ -183,13 +182,13 @@ class VertexSetRDD[@specialized V: ClassManifest](
     val cleanPred = index.rdd.context.clean(pred)
     val newValues = index.rdd.zipPartitions(valuesRDD){ 
       (keysIter: Iterator[VertexIdToIndexMap], 
-       valuesIter: Iterator[(IndexedSeq[V], BitSet)]) => 
+       valuesIter: Iterator[(Int => V, BitSet)]) => 
       val index = keysIter.next()
       assert(keysIter.hasNext() == false)
       val (oldValues, bs) = valuesIter.next()
       assert(valuesIter.hasNext() == false)
       // Allocate the array to store the results into
-      val newBS = new BitSet(oldValues.size)
+      val newBS = new BitSet(index.capacity)
       // Iterate over the active bits in the old bitset and 
       // evaluate the predicate
       var ind = bs.nextSetBit(0)
@@ -218,15 +217,13 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * VertexSetRDD retains the same index.
    */
   def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = {
-    val cleanF = index.rdd.context.clean(f)
-    val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = 
+    val newValuesRDD: RDD[ (Int => U, BitSet) ] = 
       valuesRDD.mapPartitions(iter => iter.map{ 
         case (values, bs: BitSet) => 
-          val newValues: IndexedSeq[U] = values.view.zipWithIndex.map{ 
-            (x: (V, Int)) => if(bs.get(x._2)) cleanF(x._1) else null.asInstanceOf[U]
-          }.toIndexedSeq // @todo check the toIndexedSeq is free
+          val newValues: (Int => U) = 
+            (ind: Int) => if (bs.get(ind)) f(values(ind)) else null.asInstanceOf[U]
           (newValues, bs)
-          }, preservesPartitioning = true)   
+      }, preservesPartitioning = true)   
     new VertexSetRDD[U](index, newValuesRDD)
   } // end of mapValues
 
@@ -244,22 +241,19 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * VertexSetRDD retains the same index.
    */
   def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = {
-    val cleanF = index.rdd.context.clean(f)
-    val newValues: RDD[ (IndexedSeq[U], BitSet) ] = 
+    val newValues: RDD[ (Int => U, BitSet) ] = 
       index.rdd.zipPartitions(valuesRDD){ 
         (keysIter: Iterator[VertexIdToIndexMap], 
-         valuesIter: Iterator[(IndexedSeq[V], BitSet)]) => 
+         valuesIter: Iterator[(Int => V, BitSet)]) => 
         val index = keysIter.next()
         assert(keysIter.hasNext() == false)
         val (oldValues, bs: BitSet) = valuesIter.next()
         assert(valuesIter.hasNext() == false)
         // Cosntruct a view of the map transformation
-        val newValues: IndexedSeq[U] = oldValues.view.zipWithIndex.map{ 
-          (x: (V, Int)) => 
-          if(bs.get(x._2)) {
-            cleanF(index.getValueSafe(x._2), x._1)
-          } else null.asInstanceOf[U]
-        }.toIndexedSeq // @todo check the toIndexedSeq is free
+        val newValues: (Int => U) = (ind: Int) => {
+          if (bs.get(ind)) { f(index.getValueSafe(ind), oldValues(ind)) }
+          else { null.asInstanceOf[U] }
+        }
         Iterator((newValues, bs))
       }
     new VertexSetRDD[U](index, newValues)
@@ -283,18 +277,16 @@ class VertexSetRDD[@specialized V: ClassManifest](
     if(index != other.index) {
       throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
     }
-    val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = 
+    val newValuesRDD: RDD[ (Int => (V,W), BitSet) ] = 
       valuesRDD.zipPartitions(other.valuesRDD){
-        (thisIter: Iterator[(IndexedSeq[V], BitSet)], 
-          otherIter: Iterator[(IndexedSeq[W], BitSet)]) => 
+        (thisIter: Iterator[(Int => V, BitSet)], 
+          otherIter: Iterator[(Int => W, BitSet)]) => 
         val (thisValues, thisBS: BitSet) = thisIter.next()
         assert(!thisIter.hasNext)
         val (otherValues, otherBS: BitSet) = otherIter.next()
         assert(!otherIter.hasNext)
         val newBS: BitSet = thisBS & otherBS
-        val newValues: IndexedSeq[(V,W)] = 
-          thisValues.view.zip(otherValues).toIndexedSeq // @todo check the toIndexedSeq is free
-        // Iterator((newValues.toIndexedSeq, newBS))
+        val newValues: Int => (V,W) = (ind: Int) => (thisValues(ind), otherValues(ind)) 
         Iterator((newValues, newBS))
       }
     new VertexSetRDD(index, newValuesRDD)
@@ -320,23 +312,17 @@ class VertexSetRDD[@specialized V: ClassManifest](
     if(index != other.index) {
       throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
     }
-    val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = 
+    val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] = 
       valuesRDD.zipPartitions(other.valuesRDD){
-      (thisIter: Iterator[(IndexedSeq[V], BitSet)], 
-        otherIter: Iterator[(IndexedSeq[W], BitSet)]) => 
+      (thisIter: Iterator[(Int => V, BitSet)], 
+        otherIter: Iterator[(Int => W, BitSet)]) => 
       val (thisValues, thisBS: BitSet) = thisIter.next()
       assert(!thisIter.hasNext)
       val (otherValues, otherBS: BitSet) = otherIter.next()
       assert(!otherIter.hasNext)
-      val newValues: IndexedSeq[(V, Option[W])] = thisValues.view.zip(otherValues)
-        .zipWithIndex.map {
-          // @todo not sure about the efficiency of this case statement
-          // though it is assumed that the return value is short lived
-          case ((v, w), ind) => (v, if (otherBS.get(ind)) Option(w) else None) 
-        }
-        .toIndexedSeq // @todo check the toIndexedSeq is free
+      val newValues: Int => (V, Option[W]) = (ind: Int) => 
+        (thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None)
       Iterator((newValues, thisBS))
-      //      Iterator((newValues.toIndexedSeq, thisBS))
     }
     new VertexSetRDD(index, newValuesRDD)
   } // end of leftZipJoin
@@ -380,10 +366,10 @@ class VertexSetRDD[@specialized V: ClassManifest](
           if (other.partitioner == partitioner) other 
           else other.partitionBy(partitioner.get)
         // Compute the new values RDD
-        val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = 
+        val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] = 
           index.rdd.zipPartitions(valuesRDD, otherShuffled) {
           (thisIndexIter: Iterator[VertexIdToIndexMap], 
-            thisIter: Iterator[(IndexedSeq[V], BitSet)], 
+            thisIter: Iterator[(Int => V, BitSet)], 
             tuplesIter: Iterator[(Vid,W)]) =>
           // Get the Index and values for this RDD
           val index = thisIndexIter.next()
@@ -391,9 +377,9 @@ class VertexSetRDD[@specialized V: ClassManifest](
           val (thisValues, thisBS) = thisIter.next()
           assert(!thisIter.hasNext)
           // Create a new array to store the values in the resulting VertexSet
-          val otherValues = new Array[W](thisValues.size)
+          val otherValues = new Array[W](index.capacity)
           // track which values are matched with values in other
-          val otherBS = new BitSet(thisValues.size)
+          val otherBS = new BitSet(index.capacity)
           for ((k,w) <- tuplesIter) {
             // Get the location of the key in the index
             val pos = index.getPos(k)
@@ -412,13 +398,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
           }
           // Some vertices in this vertex set may not have a corresponding
           // tuple in the join and so a None value should be returned. 
-          val newValues: IndexedSeq[(V, Option[W])] = thisValues.view.zip(otherValues)
-            .zipWithIndex.map {
-            // @todo not sure about the efficiency of this case statement
-            // though it is assumed that the return value is short lived
-            case ((v, w), ind) => (v, if (otherBS.get(ind)) Option(w) else None) 
-            }
-            .toIndexedSeq // @todo check the toIndexedSeq is free
+          val newValues: Int => (V, Option[W]) = (ind: Int) => 
+            (thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None)
           Iterator((newValues, thisBS))
         } // end of newValues
         new VertexSetRDD(index, newValuesRDD) 
@@ -644,13 +625,13 @@ object VertexSetRDD {
         hashMap.setMerge(k, v, reduceFunc)
       }
       val index = hashMap.keySet
-      val values: IndexedSeq[V] = hashMap._values
+      val values: Int => V = (ind: Int) => hashMap._values(ind)
       val bs = index.getBitSet
       Iterator( (index, (values, bs)) )
       }, true).cache
     // extract the index and the values
     val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true)
-    val values: RDD[(IndexedSeq[V], BitSet)] = 
+    val values: RDD[(Int => V, BitSet)] = 
       groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
     new VertexSetRDD[V](new VertexSetIndex(index), values)
   } // end of apply
@@ -726,7 +707,7 @@ object VertexSetRDD {
       }
 
     // Use the index to build the new values table
-    val values: RDD[ (IndexedSeq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
+    val values: RDD[ (Int => C, BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
       // There is only one map
       val index = indexIter.next()
       assert(!indexIter.hasNext())
@@ -750,7 +731,7 @@ object VertexSetRDD {
           }
         }
       }
-      Iterator((values, bs))
+      Iterator(((ind: Int) => values(ind), bs))
     })
     new VertexSetRDD(index, values)
   } // end of apply
-- 
GitLab