Skip to content
Snippets Groups Projects
Commit e7d37472 authored by Joseph E. Gonzalez's avatar Joseph E. Gonzalez
Browse files

After some testing I realized that the IndexedSeq is still instantiating the...

After some testing I realized that the IndexedSeq is still instantiating the array (not maintaining a view) so I have replaced all IndexedSeq[V] with (Int => V)
parent 63311d9c
No related branches found
No related tags found
No related merge requests found
......@@ -62,7 +62,6 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
} // end of VertexSetIndex
/**
* An VertexSetRDD[V] extends the RDD[(Vid,V)] by ensuring that there
* is only one entry for each vertex and by pre-indexing the entries
......@@ -99,7 +98,7 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
*/
class VertexSetRDD[@specialized V: ClassManifest](
@transient val index: VertexSetIndex,
@transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ])
@transient val valuesRDD: RDD[ ( (Int => V), BitSet) ])
extends RDD[(Vid, V)](index.rdd.context,
List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
......@@ -183,13 +182,13 @@ class VertexSetRDD[@specialized V: ClassManifest](
val cleanPred = index.rdd.context.clean(pred)
val newValues = index.rdd.zipPartitions(valuesRDD){
(keysIter: Iterator[VertexIdToIndexMap],
valuesIter: Iterator[(IndexedSeq[V], BitSet)]) =>
valuesIter: Iterator[(Int => V, BitSet)]) =>
val index = keysIter.next()
assert(keysIter.hasNext() == false)
val (oldValues, bs) = valuesIter.next()
assert(valuesIter.hasNext() == false)
// Allocate the array to store the results into
val newBS = new BitSet(oldValues.size)
val newBS = new BitSet(index.capacity)
// Iterate over the active bits in the old bitset and
// evaluate the predicate
var ind = bs.nextSetBit(0)
......@@ -218,15 +217,13 @@ class VertexSetRDD[@specialized V: ClassManifest](
* VertexSetRDD retains the same index.
*/
def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = {
val cleanF = index.rdd.context.clean(f)
val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] =
val newValuesRDD: RDD[ (Int => U, BitSet) ] =
valuesRDD.mapPartitions(iter => iter.map{
case (values, bs: BitSet) =>
val newValues: IndexedSeq[U] = values.view.zipWithIndex.map{
(x: (V, Int)) => if(bs.get(x._2)) cleanF(x._1) else null.asInstanceOf[U]
}.toIndexedSeq // @todo check the toIndexedSeq is free
val newValues: (Int => U) =
(ind: Int) => if (bs.get(ind)) f(values(ind)) else null.asInstanceOf[U]
(newValues, bs)
}, preservesPartitioning = true)
}, preservesPartitioning = true)
new VertexSetRDD[U](index, newValuesRDD)
} // end of mapValues
......@@ -244,22 +241,19 @@ class VertexSetRDD[@specialized V: ClassManifest](
* VertexSetRDD retains the same index.
*/
def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = {
val cleanF = index.rdd.context.clean(f)
val newValues: RDD[ (IndexedSeq[U], BitSet) ] =
val newValues: RDD[ (Int => U, BitSet) ] =
index.rdd.zipPartitions(valuesRDD){
(keysIter: Iterator[VertexIdToIndexMap],
valuesIter: Iterator[(IndexedSeq[V], BitSet)]) =>
valuesIter: Iterator[(Int => V, BitSet)]) =>
val index = keysIter.next()
assert(keysIter.hasNext() == false)
val (oldValues, bs: BitSet) = valuesIter.next()
assert(valuesIter.hasNext() == false)
// Cosntruct a view of the map transformation
val newValues: IndexedSeq[U] = oldValues.view.zipWithIndex.map{
(x: (V, Int)) =>
if(bs.get(x._2)) {
cleanF(index.getValueSafe(x._2), x._1)
} else null.asInstanceOf[U]
}.toIndexedSeq // @todo check the toIndexedSeq is free
val newValues: (Int => U) = (ind: Int) => {
if (bs.get(ind)) { f(index.getValueSafe(ind), oldValues(ind)) }
else { null.asInstanceOf[U] }
}
Iterator((newValues, bs))
}
new VertexSetRDD[U](index, newValues)
......@@ -283,18 +277,16 @@ class VertexSetRDD[@specialized V: ClassManifest](
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
}
val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] =
val newValuesRDD: RDD[ (Int => (V,W), BitSet) ] =
valuesRDD.zipPartitions(other.valuesRDD){
(thisIter: Iterator[(IndexedSeq[V], BitSet)],
otherIter: Iterator[(IndexedSeq[W], BitSet)]) =>
(thisIter: Iterator[(Int => V, BitSet)],
otherIter: Iterator[(Int => W, BitSet)]) =>
val (thisValues, thisBS: BitSet) = thisIter.next()
assert(!thisIter.hasNext)
val (otherValues, otherBS: BitSet) = otherIter.next()
assert(!otherIter.hasNext)
val newBS: BitSet = thisBS & otherBS
val newValues: IndexedSeq[(V,W)] =
thisValues.view.zip(otherValues).toIndexedSeq // @todo check the toIndexedSeq is free
// Iterator((newValues.toIndexedSeq, newBS))
val newValues: Int => (V,W) = (ind: Int) => (thisValues(ind), otherValues(ind))
Iterator((newValues, newBS))
}
new VertexSetRDD(index, newValuesRDD)
......@@ -320,23 +312,17 @@ class VertexSetRDD[@specialized V: ClassManifest](
if(index != other.index) {
throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
}
val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] =
val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] =
valuesRDD.zipPartitions(other.valuesRDD){
(thisIter: Iterator[(IndexedSeq[V], BitSet)],
otherIter: Iterator[(IndexedSeq[W], BitSet)]) =>
(thisIter: Iterator[(Int => V, BitSet)],
otherIter: Iterator[(Int => W, BitSet)]) =>
val (thisValues, thisBS: BitSet) = thisIter.next()
assert(!thisIter.hasNext)
val (otherValues, otherBS: BitSet) = otherIter.next()
assert(!otherIter.hasNext)
val newValues: IndexedSeq[(V, Option[W])] = thisValues.view.zip(otherValues)
.zipWithIndex.map {
// @todo not sure about the efficiency of this case statement
// though it is assumed that the return value is short lived
case ((v, w), ind) => (v, if (otherBS.get(ind)) Option(w) else None)
}
.toIndexedSeq // @todo check the toIndexedSeq is free
val newValues: Int => (V, Option[W]) = (ind: Int) =>
(thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None)
Iterator((newValues, thisBS))
// Iterator((newValues.toIndexedSeq, thisBS))
}
new VertexSetRDD(index, newValuesRDD)
} // end of leftZipJoin
......@@ -380,10 +366,10 @@ class VertexSetRDD[@specialized V: ClassManifest](
if (other.partitioner == partitioner) other
else other.partitionBy(partitioner.get)
// Compute the new values RDD
val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] =
val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] =
index.rdd.zipPartitions(valuesRDD, otherShuffled) {
(thisIndexIter: Iterator[VertexIdToIndexMap],
thisIter: Iterator[(IndexedSeq[V], BitSet)],
thisIter: Iterator[(Int => V, BitSet)],
tuplesIter: Iterator[(Vid,W)]) =>
// Get the Index and values for this RDD
val index = thisIndexIter.next()
......@@ -391,9 +377,9 @@ class VertexSetRDD[@specialized V: ClassManifest](
val (thisValues, thisBS) = thisIter.next()
assert(!thisIter.hasNext)
// Create a new array to store the values in the resulting VertexSet
val otherValues = new Array[W](thisValues.size)
val otherValues = new Array[W](index.capacity)
// track which values are matched with values in other
val otherBS = new BitSet(thisValues.size)
val otherBS = new BitSet(index.capacity)
for ((k,w) <- tuplesIter) {
// Get the location of the key in the index
val pos = index.getPos(k)
......@@ -412,13 +398,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
}
// Some vertices in this vertex set may not have a corresponding
// tuple in the join and so a None value should be returned.
val newValues: IndexedSeq[(V, Option[W])] = thisValues.view.zip(otherValues)
.zipWithIndex.map {
// @todo not sure about the efficiency of this case statement
// though it is assumed that the return value is short lived
case ((v, w), ind) => (v, if (otherBS.get(ind)) Option(w) else None)
}
.toIndexedSeq // @todo check the toIndexedSeq is free
val newValues: Int => (V, Option[W]) = (ind: Int) =>
(thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None)
Iterator((newValues, thisBS))
} // end of newValues
new VertexSetRDD(index, newValuesRDD)
......@@ -644,13 +625,13 @@ object VertexSetRDD {
hashMap.setMerge(k, v, reduceFunc)
}
val index = hashMap.keySet
val values: IndexedSeq[V] = hashMap._values
val values: Int => V = (ind: Int) => hashMap._values(ind)
val bs = index.getBitSet
Iterator( (index, (values, bs)) )
}, true).cache
// extract the index and the values
val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true)
val values: RDD[(IndexedSeq[V], BitSet)] =
val values: RDD[(Int => V, BitSet)] =
groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
new VertexSetRDD[V](new VertexSetIndex(index), values)
} // end of apply
......@@ -726,7 +707,7 @@ object VertexSetRDD {
}
// Use the index to build the new values table
val values: RDD[ (IndexedSeq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
val values: RDD[ (Int => C, BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
// There is only one map
val index = indexIter.next()
assert(!indexIter.hasNext())
......@@ -750,7 +731,7 @@ object VertexSetRDD {
}
}
}
Iterator((values, bs))
Iterator(((ind: Int) => values(ind), bs))
})
new VertexSetRDD(index, values)
} // end of apply
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment