Skip to content
Snippets Groups Projects
Commit 6fadff2b authored by Reynold Xin's avatar Reynold Xin
Browse files

Converted for loops to while loops in EdgePartition.

parent edf41647
No related branches found
No related tags found
No related merge requests found
package org.apache.spark.graph package org.apache.spark.graph
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.util.ClosureCleaner
/** /**
......
package org.apache.spark.graph.impl package org.apache.spark.graph.impl
import scala.collection.mutable.ArrayBuilder
import org.apache.spark.graph._ import org.apache.spark.graph._
...@@ -8,47 +7,46 @@ import org.apache.spark.graph._ ...@@ -8,47 +7,46 @@ import org.apache.spark.graph._
* A partition of edges in 3 large columnar arrays. * A partition of edges in 3 large columnar arrays.
*/ */
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]( class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest](
val srcIds: Array[Vid], val srcIds: Array[Vid],
val dstIds: Array[Vid], val dstIds: Array[Vid],
val data: Array[ED] val data: Array[ED])
){ {
// private var _data: Array[ED] = _
// private var _dataBuilder = ArrayBuilder.make[ED]
// var srcIds = new VertexArrayList
// var dstIds = new VertexArrayList
def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data) def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data)
def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = { def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = {
val newData = new Array[ED2](data.size) val newData = new Array[ED2](data.size)
val edge = new Edge[ED]() val edge = new Edge[ED]()
for(i <- 0 until data.size){ val size = data.size
var i = 0
while (i < size) {
edge.srcId = srcIds(i) edge.srcId = srcIds(i)
edge.dstId = dstIds(i) edge.dstId = dstIds(i)
edge.attr = data(i) edge.attr = data(i)
newData(i) = f(edge) newData(i) = f(edge)
i += 1
} }
new EdgePartition(srcIds, dstIds, newData) new EdgePartition(srcIds, dstIds, newData)
} }
def foreach(f: Edge[ED] => Unit) { def foreach(f: Edge[ED] => Unit) {
val edge = new Edge[ED] val edge = new Edge[ED]
for(i <- 0 until data.size){ val size = data.size
edge.srcId = srcIds(i) var i = 0
edge.dstId = dstIds(i) while (i < size) {
edge.srcId = srcIds(i)
edge.dstId = dstIds(i)
edge.attr = data(i) edge.attr = data(i)
f(edge) f(edge)
i += 1
} }
} }
def size: Int = srcIds.size def size: Int = srcIds.size
def iterator = new Iterator[Edge[ED]] { def iterator = new Iterator[Edge[ED]] {
private val edge = new Edge[ED] private[this] val edge = new Edge[ED]
private var pos = 0 private[this] var pos = 0
override def hasNext: Boolean = pos < EdgePartition.this.size override def hasNext: Boolean = pos < EdgePartition.this.size
...@@ -61,5 +59,3 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ...@@ -61,5 +59,3 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
} }
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment