diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 89e1b4ea01a81fa14a1a4d721853e871661b23f6..acfdc4378b0a0cd612832e8e3860e53690215827 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -1,9 +1,6 @@ package org.apache.spark.graph - import org.apache.spark.rdd.RDD -import org.apache.spark.util.ClosureCleaner - /** diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala index dbfccde8b91d5830daae16e23dff70a3e37ed48e..750075533af4e2281d416051a101d8e4b1314a91 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala @@ -1,6 +1,5 @@ package org.apache.spark.graph.impl -import scala.collection.mutable.ArrayBuilder import org.apache.spark.graph._ @@ -8,47 +7,46 @@ import org.apache.spark.graph._ * A partition of edges in 3 large columnar arrays. */ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]( - val srcIds: Array[Vid], - val dstIds: Array[Vid], - val data: Array[ED] - ){ - - // private var _data: Array[ED] = _ - // private var _dataBuilder = ArrayBuilder.make[ED] - - // var srcIds = new VertexArrayList - // var dstIds = new VertexArrayList + val srcIds: Array[Vid], + val dstIds: Array[Vid], + val data: Array[ED]) +{ def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data) def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = { val newData = new Array[ED2](data.size) val edge = new Edge[ED]() - for(i <- 0 until data.size){ + val size = data.size + var i = 0 + while (i < size) { edge.srcId = srcIds(i) edge.dstId = dstIds(i) edge.attr = data(i) - newData(i) = f(edge) + newData(i) = f(edge) + i += 1 } new EdgePartition(srcIds, dstIds, newData) } def foreach(f: Edge[ED] => Unit) { val edge = new Edge[ED] - for(i <- 0 until data.size){ - edge.srcId = srcIds(i) - edge.dstId = dstIds(i) + val size = data.size + var i = 0 + while (i < size) { + edge.srcId = srcIds(i) + edge.dstId = dstIds(i) edge.attr = data(i) - f(edge) + f(edge) + i += 1 } } - def size: Int = srcIds.size def iterator = new Iterator[Edge[ED]] { - private val edge = new Edge[ED] - private var pos = 0 + private[this] val edge = new Edge[ED] + private[this] var pos = 0 override def hasNext: Boolean = pos < EdgePartition.this.size @@ -61,5 +59,3 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) } } } - -