diff --git a/graph/src/main/scala/org/apache/spark/graph/Edge.scala b/graph/src/main/scala/org/apache/spark/graph/Edge.scala index 5ac77839eb8661cd24af08dad9f9f4101483aa6f..19c28bea685ba5ca0a8c24c85ac4b95332ea5a21 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Edge.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Edge.scala @@ -11,11 +11,11 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] /** * The vertex id of the source vertex */ - var srcId: Vid = 0, + var srcId: VertexID = 0, /** * The vertex id of the target vertex. */ - var dstId: Vid = 0, + var dstId: VertexID = 0, /** * The attribute associated with the edge. */ @@ -27,7 +27,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] * @param vid the id one of the two vertices on the edge. * @return the id of the other vertex on the edge. */ - def otherVertexId(vid: Vid): Vid = + def otherVertexId(vid: VertexID): VertexID = if (srcId == vid) dstId else { assert(dstId == vid); srcId } /** @@ -38,13 +38,13 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] * @return the relative direction of the edge to the corresponding * vertex. */ - def relativeDirection(vid: Vid): EdgeDirection = + def relativeDirection(vid: VertexID): EdgeDirection = if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In } } object Edge { def lexicographicOrdering[ED] = new Ordering[Edge[ED]] { override def compare(a: Edge[ED], b: Edge[ED]): Int = - Ordering[(Vid, Vid)].compare((a.srcId, a.dstId), (b.srcId, b.dstId)) + Ordering[(VertexID, VertexID)].compare((a.srcId, a.dstId), (b.srcId, b.dstId)) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala index 230202d6b0a6fbcc8a412e7e461740e7cbd9c4f3..fd933593523a81c32b70c6d3a0113a5691b9c09e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala @@ -55,7 +55,7 @@ class EdgeRDD[@specialized ED: ClassTag]( def innerJoin[ED2: ClassTag, ED3: ClassTag] (other: EdgeRDD[ED2]) - (f: (Vid, Vid, ED, ED2) => ED3): EdgeRDD[ED3] = { + (f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] = { val ed2Tag = classTag[ED2] val ed3Tag = classTag[ED3] new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) { @@ -66,7 +66,7 @@ class EdgeRDD[@specialized ED: ClassTag]( }) } - def collectVids(): RDD[Vid] = { + def collectVertexIDs(): RDD[VertexID] = { partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala index 5a384a5f84dd96e1031eae3a92874eb6d4a9380a..a5103ed3cbffcf8cb36de6d18eebff97886a9d49 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala @@ -47,7 +47,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { * @param vid the id one of the two vertices on the edge. * @return the attribute for the other vertex on the edge. */ - def otherVertexAttr(vid: Vid): VD = + def otherVertexAttr(vid: VertexID): VD = if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr } /** @@ -56,7 +56,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { * @param vid the id of one of the two vertices on the edge * @return the attr for the vertex with that id. */ - def vertexAttr(vid: Vid): VD = + def vertexAttr(vid: VertexID): VD = if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr } override def toString() = ((srcId, srcAttr), (dstId, dstAttr), attr).toString() diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 420d01b4261809232647a77a0a02f6b6b3a691a5..dd0799142ef746a950b5978e708f2ce5a2543eec 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -125,7 +125,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * }}} * */ - def mapVertices[VD2: ClassTag](map: (Vid, VD) => VD2): Graph[VD2, ED] + def mapVertices[VD2: ClassTag](map: (VertexID, VD) => VD2): Graph[VD2, ED] /** * Construct a new graph where the value of each edge is @@ -253,7 +253,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * satisfy the predicates. */ def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), - vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED] + vpred: (VertexID, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED] /** * Subgraph of this graph with only vertices and edges from the other graph. @@ -302,7 +302,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * vertex * {{{ * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph") - * val inDeg: RDD[(Vid, Int)] = + * val inDeg: RDD[(VertexID, Int)] = * mapReduceTriplets[Int](et => Array((et.dst.id, 1)), _ + _) * }}} * @@ -314,7 +314,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * */ def mapReduceTriplets[A: ClassTag]( - mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)], + mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], reduceFunc: (A, A) => A, activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) : VertexRDD[A] @@ -341,15 +341,15 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * * {{{ * val rawGraph: Graph[(),()] = Graph.textFile("webgraph") - * val outDeg: RDD[(Vid, Int)] = rawGraph.outDegrees() + * val outDeg: RDD[(VertexID, Int)] = rawGraph.outDegrees() * val graph = rawGraph.outerJoinVertices(outDeg) { * (vid, data, optDeg) => optDeg.getOrElse(0) * } * }}} * */ - def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(Vid, U)]) - (mapFunc: (Vid, VD, Option[U]) => VD2) + def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(VertexID, U)]) + (mapFunc: (VertexID, VD, Option[U]) => VD2) : Graph[VD2, ED] // Save a copy of the GraphOps object so there is always one unique GraphOps object @@ -377,7 +377,7 @@ object Graph { * (if `uniqueEdges=None`) and vertex attributes containing the total degree of each vertex. */ def fromEdgeTuples[VD: ClassTag]( - rawEdges: RDD[(Vid, Vid)], + rawEdges: RDD[(VertexID, VertexID)], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = { val edges = rawEdges.map(p => Edge(p._1, p._2, 1)) @@ -419,7 +419,7 @@ object Graph { * partitioning the edges. */ def apply[VD: ClassTag, ED: ClassTag]( - vertices: RDD[(Vid, VD)], + vertices: RDD[(VertexID, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = { GraphImpl(vertices, edges, defaultVertexAttr) diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala index b8c1b5b0f032d1d64f02f8e8d0afa6d7e765b658..296f3848f1c61e7efc7f82ecc3f729594927a0ef 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala @@ -14,7 +14,7 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[Edge[Object]]) kryo.register(classOf[MessageToPartition[Object]]) kryo.register(classOf[VertexBroadcastMsg[Object]]) - kryo.register(classOf[(Vid, Object)]) + kryo.register(classOf[(VertexID, Object)]) kryo.register(classOf[EdgePartition[Object]]) kryo.register(classOf[BitSet]) kryo.register(classOf[VertexIdToIndexMap]) diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala index c1ce5cd9ccadac16bf8bd63ee8c9f6adc0fca984..22f48540193eaf24b2dc10da71656d8d68c9c505 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala @@ -42,11 +42,12 @@ object GraphLab extends Logging { (graph: Graph[VD, ED], numIter: Int, gatherDirection: EdgeDirection = EdgeDirection.In, scatterDirection: EdgeDirection = EdgeDirection.Out) - (gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A, + (gatherFunc: (VertexID, EdgeTriplet[VD, ED]) => A, mergeFunc: (A, A) => A, - applyFunc: (Vid, VD, Option[A]) => VD, - scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean, - startVertices: (Vid, VD) => Boolean = (vid: Vid, data: VD) => true): Graph[VD, ED] = { + applyFunc: (VertexID, VD, Option[A]) => VD, + scatterFunc: (VertexID, EdgeTriplet[VD, ED]) => Boolean, + startVertices: (VertexID, VD) => Boolean = (vid: VertexID, data: VD) => true) + : Graph[VD, ED] = { // Add an active attribute to all vertices to track convergence. @@ -56,7 +57,7 @@ object GraphLab extends Logging { // The gather function wrapper strips the active attribute and // only invokes the gather function on active vertices - def gather(vid: Vid, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = { + def gather(vid: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = { if (e.vertexAttr(vid)._1) { val edgeTriplet = new EdgeTriplet[VD,ED] edgeTriplet.set(e) @@ -70,7 +71,7 @@ object GraphLab extends Logging { // The apply function wrapper strips the vertex of the active attribute // and only invokes the apply function on active vertices - def apply(vid: Vid, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = { + def apply(vid: VertexID, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = { val (active, vData) = data if (active) (true, applyFunc(vid, vData, accum)) else (false, vData) @@ -78,8 +79,8 @@ object GraphLab extends Logging { // The scatter function wrapper strips the vertex of the active attribute // and only invokes the scatter function on active vertices - def scatter(rawVid: Vid, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = { - val vid = e.otherVertexId(rawVid) + def scatter(rawVertexID: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = { + val vid = e.otherVertexId(rawVertexID) if (e.vertexAttr(vid)._1) { val edgeTriplet = new EdgeTriplet[VD,ED] edgeTriplet.set(e) @@ -92,7 +93,8 @@ object GraphLab extends Logging { } // Used to set the active status of vertices for the next round - def applyActive(vid: Vid, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = { + def applyActive( + vid: VertexID, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = { val (prevActive, vData) = data (newActiveOpt.getOrElse(false), vData) } @@ -103,7 +105,7 @@ object GraphLab extends Logging { while (i < numIter && numActive > 0) { // Gather - val gathered: RDD[(Vid, A)] = + val gathered: RDD[(VertexID, A)] = activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection) // Apply @@ -113,7 +115,7 @@ object GraphLab extends Logging { // Scatter is basically a gather in the opposite direction so we reverse the edge direction // activeGraph: Graph[(Boolean, VD), ED] - val scattered: RDD[(Vid, Boolean)] = + val scattered: RDD[(VertexID, Boolean)] = activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache() diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala index 11c6120beb414d21292f609e5d8b218ca095e54e..e41287c1ed4f10de79ef484264fbfa0fb3c4c8fb 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -112,7 +112,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * */ def aggregateNeighbors[A: ClassTag]( - mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A], + mapFunc: (VertexID, EdgeTriplet[VD, ED]) => Option[A], reduceFunc: (A, A) => A, dir: EdgeDirection) : VertexRDD[A] = { @@ -151,25 +151,27 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * @return the vertex set of neighboring ids for each vertex. */ def collectNeighborIds(edgeDirection: EdgeDirection) : - VertexRDD[Array[Vid]] = { + VertexRDD[Array[VertexID]] = { val nbrs = if (edgeDirection == EdgeDirection.Both) { - graph.mapReduceTriplets[Array[Vid]]( + graph.mapReduceTriplets[Array[VertexID]]( mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))), reduceFunc = _ ++ _ ) } else if (edgeDirection == EdgeDirection.Out) { - graph.mapReduceTriplets[Array[Vid]]( + graph.mapReduceTriplets[Array[VertexID]]( mapFunc = et => Iterator((et.srcId, Array(et.dstId))), reduceFunc = _ ++ _) } else if (edgeDirection == EdgeDirection.In) { - graph.mapReduceTriplets[Array[Vid]]( + graph.mapReduceTriplets[Array[VertexID]]( mapFunc = et => Iterator((et.dstId, Array(et.srcId))), reduceFunc = _ ++ _) } else { throw new SparkException("It doesn't make sense to collect neighbor ids without a direction.") } - graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[Vid]) } + graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => + nbrsOpt.getOrElse(Array.empty[VertexID]) + } } // end of collectNeighborIds @@ -187,14 +189,16 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * vertex. */ def collectNeighbors(edgeDirection: EdgeDirection) : - VertexRDD[ Array[(Vid, VD)] ] = { - val nbrs = graph.aggregateNeighbors[Array[(Vid,VD)]]( + VertexRDD[ Array[(VertexID, VD)] ] = { + val nbrs = graph.aggregateNeighbors[Array[(VertexID,VD)]]( (vid, edge) => Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )), (a, b) => a ++ b, edgeDirection) - graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[(Vid, VD)]) } + graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => + nbrsOpt.getOrElse(Array.empty[(VertexID, VD)]) + } } // end of collectNeighbor @@ -228,9 +232,9 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * }}} * */ - def joinVertices[U: ClassTag](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD) + def joinVertices[U: ClassTag](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD) : Graph[VD, ED] = { - val uf = (id: Vid, data: VD, o: Option[U]) => { + val uf = (id: VertexID, data: VD, o: Option[U]) => { o match { case Some(u) => mapFunc(id, data, u) case None => data @@ -259,7 +263,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * val degrees: VertexSetRDD[Int] = graph.outDegrees * graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} * }, - * vpred = (vid: Vid, deg:Int) => deg > 0 + * vpred = (vid: VertexID, deg:Int) => deg > 0 * ) * }}} * @@ -267,7 +271,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { def filter[VD2: ClassTag, ED2: ClassTag]( preprocess: Graph[VD, ED] => Graph[VD2, ED2], epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true, - vpred: (Vid, VD2) => Boolean = (v:Vid, d:VD2) => true): Graph[VD, ED] = { + vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = { graph.mask(preprocess(graph).subgraph(epred, vpred)) } } // end of GraphOps diff --git a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala index 293a9d588afda179622279b497a2220f1230e880..c01b4b9439589ba26408f650a867571fa2b97c5e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala +++ b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala @@ -2,7 +2,7 @@ package org.apache.spark.graph sealed trait PartitionStrategy extends Serializable { - def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid + def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid } @@ -51,9 +51,9 @@ sealed trait PartitionStrategy extends Serializable { * */ case object EdgePartition2D extends PartitionStrategy { - override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = { + override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = { val ceilSqrtNumParts: Pid = math.ceil(math.sqrt(numParts)).toInt - val mixingPrime: Vid = 1125899906842597L + val mixingPrime: VertexID = 1125899906842597L val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt (col * ceilSqrtNumParts + row) % numParts @@ -62,8 +62,8 @@ case object EdgePartition2D extends PartitionStrategy { case object EdgePartition1D extends PartitionStrategy { - override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = { - val mixingPrime: Vid = 1125899906842597L + override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = { + val mixingPrime: VertexID = 1125899906842597L (math.abs(src) * mixingPrime).toInt % numParts } } @@ -74,7 +74,7 @@ case object EdgePartition1D extends PartitionStrategy { * random vertex cut. */ case object RandomVertexCut extends PartitionStrategy { - override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = { + override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = { math.abs((src, dst).hashCode()) % numParts } } @@ -86,7 +86,7 @@ case object RandomVertexCut extends PartitionStrategy { * will end up on the same partition. */ case object CanonicalRandomVertexCut extends PartitionStrategy { - override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = { + override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = { val lower = math.min(src, dst) val higher = math.max(src, dst) math.abs((lower, higher).hashCode()) % numParts diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 4664091b5714b14156484e250d160f5a17cfee16..3b84e2e5e4195a67d0f5e6e00b652b491b5791c2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -25,9 +25,9 @@ import scala.reflect.ClassTag * // Set the vertex attributes to the initial pagerank values * .mapVertices( (id, attr) => 1.0 ) * - * def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double = + * def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = * resetProb + (1.0 - resetProb) * msgSum - * def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] = + * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Option[Double] = * Some(edge.srcAttr * edge.attr) * def messageCombiner(a: Double, b: Double): Double = a + b * val initialMessage = 0.0 @@ -88,8 +88,8 @@ object Pregel { */ def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)( - vprog: (Vid, VD, A) => VD, - sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)], + vprog: (VertexID, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala index c5fb4aeca73a058b06e7f52107a6075c353f5093..25b0aed85aae95083d31aaac0d5b2ebc4803203e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala @@ -29,7 +29,7 @@ import org.apache.spark.graph.impl.VertexPartition /** - * A `VertexRDD[VD]` extends the `RDD[(Vid, VD)]` by ensuring that there is + * A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is * only one entry for each vertex and by pre-indexing the entries for fast, * efficient joins. * @@ -40,12 +40,12 @@ import org.apache.spark.graph.impl.VertexPartition * @example Construct a `VertexRDD` from a plain RDD * {{{ * // Construct an intial vertex set - * val someData: RDD[(Vid, SomeType)] = loadData(someFile) + * val someData: RDD[(VertexID, SomeType)] = loadData(someFile) * val vset = VertexRDD(someData) * // If there were redundant values in someData we would use a reduceFunc * val vset2 = VertexRDD(someData, reduceFunc) * // Finally we can use the VertexRDD to index another dataset - * val otherData: RDD[(Vid, OtherType)] = loadData(otherFile) + * val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile) * val vset3 = VertexRDD(otherData, vset.index) * // Now we can construct very fast joins between the two sets * val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3) @@ -54,7 +54,7 @@ import org.apache.spark.graph.impl.VertexPartition */ class VertexRDD[@specialized VD: ClassTag]( val partitionsRDD: RDD[VertexPartition[VD]]) - extends RDD[(Vid, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { + extends RDD[(VertexID, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { require(partitionsRDD.partitioner.isDefined) @@ -104,9 +104,9 @@ class VertexRDD[@specialized VD: ClassTag]( } /** - * Provide the `RDD[(Vid, VD)]` equivalent output. + * Provide the `RDD[(VertexID, VD)]` equivalent output. */ - override def compute(part: Partition, context: TaskContext): Iterator[(Vid, VD)] = { + override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = { firstParent[VertexPartition[VD]].iterator(part, context).next.iterator } @@ -125,14 +125,14 @@ class VertexRDD[@specialized VD: ClassTag]( * given predicate. * * @param pred the user defined predicate, which takes a tuple to conform to - * the RDD[(Vid, VD)] interface + * the RDD[(VertexID, VD)] interface * * @note The vertex set preserves the original index structure * which means that the returned RDD can be easily joined with * the original vertex-set. Furthermore, the filter only * modifies the bitmap index and so no new values are allocated. */ - override def filter(pred: Tuple2[Vid, VD] => Boolean): VertexRDD[VD] = + override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] = this.mapVertexPartitions(_.filter(Function.untupled(pred))) /** @@ -160,7 +160,7 @@ class VertexRDD[@specialized VD: ClassTag]( * each of the entries in the original VertexRDD. The resulting * VertexRDD retains the same index. */ - def mapValues[VD2: ClassTag](f: (Vid, VD) => VD2): VertexRDD[VD2] = + def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] = this.mapVertexPartitions(_.map(f)) /** @@ -197,7 +197,7 @@ class VertexRDD[@specialized VD: ClassTag]( * */ def leftZipJoin[VD2: ClassTag, VD3: ClassTag] - (other: VertexRDD[VD2])(f: (Vid, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { + (other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { val newPartitionsRDD = partitionsRDD.zipPartitions( other.partitionsRDD, preservesPartitioning = true ) { (thisIter, otherIter) => @@ -228,8 +228,8 @@ class VertexRDD[@specialized VD: ClassTag]( * VertexRDD with the attribute emitted by f. */ def leftJoin[VD2: ClassTag, VD3: ClassTag] - (other: RDD[(Vid, VD2)]) - (f: (Vid, VD, Option[VD2]) => VD3) + (other: RDD[(VertexID, VD2)]) + (f: (VertexID, VD, Option[VD2]) => VD3) : VertexRDD[VD3] = { // Test if the other vertex is a VertexRDD to choose the optimal join strategy. @@ -254,7 +254,7 @@ class VertexRDD[@specialized VD: ClassTag]( * must have the same index. */ def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U]) - (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = { + (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { val newPartitionsRDD = partitionsRDD.zipPartitions( other.partitionsRDD, preservesPartitioning = true ) { (thisIter, otherIter) => @@ -269,8 +269,8 @@ class VertexRDD[@specialized VD: ClassTag]( * Replace vertices with corresponding vertices in `other`, and drop vertices without a * corresponding vertex in `other`. */ - def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(Vid, U)]) - (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = { + def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)]) + (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = { // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient innerZipJoin other match { @@ -293,7 +293,7 @@ class VertexRDD[@specialized VD: ClassTag]( * co-indexed with this one. */ def aggregateUsingIndex[VD2: ClassTag]( - messages: RDD[(Vid, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = + messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get) val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => @@ -319,8 +319,8 @@ object VertexRDD { * * @param rdd the collection of vertex-attribute pairs */ - def apply[VD: ClassTag](rdd: RDD[(Vid, VD)]): VertexRDD[VD] = { - val partitioned: RDD[(Vid, VD)] = rdd.partitioner match { + def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)]): VertexRDD[VD] = { + val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match { case Some(p) => rdd case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) } @@ -339,9 +339,9 @@ object VertexRDD { * @param rdd the collection of vertex-attribute pairs * @param mergeFunc the associative, commutative merge function. */ - def apply[VD: ClassTag](rdd: RDD[(Vid, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = + def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = { - val partitioned: RDD[(Vid, VD)] = rdd.partitioner match { + val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match { case Some(p) => rdd case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) } @@ -351,7 +351,7 @@ object VertexRDD { new VertexRDD(vertexPartitions) } - def apply[VD: ClassTag](vids: RDD[Vid], rdd: RDD[(Vid, VD)], defaultVal: VD) + def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD) : VertexRDD[VD] = { VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) => diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala index 7cd947d2ba7df13e327e155205c2db0ec2370b1b..2a6b8c0999ab08ca59ee67d75d8865d54d7bafa0 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala @@ -16,10 +16,10 @@ object ConnectedComponents { * @return a graph with vertex attributes containing the smallest vertex in each * connected component */ - def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]): Graph[Vid, ED] = { + def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]): Graph[VertexID, ED] = { val ccGraph = graph.mapVertices { case (vid, _) => vid } - def sendMessage(edge: EdgeTriplet[Vid, ED]) = { + def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { if (edge.srcAttr < edge.dstAttr) { Iterator((edge.dstId, edge.srcAttr)) } else if (edge.srcAttr > edge.dstAttr) { diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala index f77dffd7b415606b2de0c4a390a0c64fc6073d7a..26b8dc5ab60c6238fccdc27bee18ecadd67ce972 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala @@ -65,7 +65,7 @@ object PageRank extends Logging { // Define the three functions needed to implement PageRank in the GraphX // version of Pregel - def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double = + def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = resetProb + (1.0 - resetProb) * msgSum def sendMessage(edge: EdgeTriplet[Double, Double]) = Iterator((edge.dstId, edge.srcAttr * edge.attr)) @@ -129,7 +129,7 @@ object PageRank extends Logging { // Define the three functions needed to implement PageRank in the GraphX // version of Pregel - def vertexProgram(id: Vid, attr: (Double, Double), msgSum: Double): (Double, Double) = { + def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = { val (oldPR, lastDelta) = attr val newPR = oldPR + (1.0 - resetProb) * msgSum (newPR, newPR - oldPR) diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala index c324c984d76924df757b808fca931e83f7051c7a..8031aa10ce3ea610daabad20635beb403b70c0a6 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala @@ -15,7 +15,7 @@ object StronglyConnectedComponents { * * @return a graph with vertex attributes containing the smallest vertex id in each SCC */ - def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int): Graph[Vid, ED] = { + def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = { // the graph we update with final SCC ids, and the graph we return at the end var sccGraph = graph.mapVertices { case (vid, _) => vid } @@ -52,7 +52,7 @@ object StronglyConnectedComponents { // collect min of all my neighbor's scc values, update if it's smaller than mine // then notify any neighbors with scc values larger than mine - sccWorkGraph = GraphLab[(Vid, Boolean), ED, Vid](sccWorkGraph, Integer.MAX_VALUE)( + sccWorkGraph = GraphLab[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Integer.MAX_VALUE)( (vid, e) => e.otherVertexAttr(vid)._1, (vid1, vid2) => math.min(vid1, vid2), (vid, scc, optScc) => @@ -62,7 +62,7 @@ object StronglyConnectedComponents { // start at root of SCCs. Traverse values in reverse, notify all my neighbors // do not propagate if colors do not match! - sccWorkGraph = GraphLab[(Vid, Boolean), ED, Boolean]( + sccWorkGraph = GraphLab[(VertexID, Boolean), ED, Boolean]( sccWorkGraph, Integer.MAX_VALUE, EdgeDirection.Out, diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala index 18395bdc5f230ebe0138a22dfdc384abe5bf0d05..85fa23d30946963da2a77c6e29c14e5aa32d99b0 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala @@ -51,12 +51,12 @@ object Svdpp { // calculate initial bias and norm var t0 = g.mapReduceTriplets(et => Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2)) - g = g.outerJoinVertices(t0) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => + g = g.outerJoinVertices(t0) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) } def mapTrainF(conf: SvdppConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) - : Iterator[(Vid, (RealVector, RealVector, Double))] = { + : Iterator[(VertexID, (RealVector, RealVector, Double))] = { val (usr, itm) = (et.srcAttr, et.dstAttr) val (p, q) = (usr._1, itm._1) var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) @@ -73,19 +73,19 @@ object Svdpp { for (i <- 0 until conf.maxIters) { // phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2)) - g = g.outerJoinVertices(t1) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => + g = g.outerJoinVertices(t1) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd } // phase 2, update p for user nodes and q, y for item nodes val t2 = g.mapReduceTriplets(mapTrainF(conf, u), (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) => (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3)) - g = g.outerJoinVertices(t2) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) => + g = g.outerJoinVertices(t2) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) => (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4) } } // calculate error on training set - def mapTestF(conf: SvdppConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(Vid, Double)] = { + def mapTestF(conf: SvdppConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(VertexID, Double)] = { val (usr, itm) = (et.srcAttr, et.dstAttr) val (p, q) = (usr._1, itm._1) var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) @@ -95,7 +95,7 @@ object Svdpp { Iterator((et.dstId, err)) } val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2) - g = g.outerJoinVertices(t3) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => + g = g.outerJoinVertices(t3) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd } (g, u) diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala index a6384320bab6b7447440db59b146c0b81eae560e..81774d52e429177d6934fce7f54f4efbd34ba407 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala @@ -46,7 +46,7 @@ object TriangleCount { (vid, _, optSet) => optSet.getOrElse(null) } // Edge function computes intersection of smaller vertex with larger vertex - def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(Vid, Int)] = { + def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = { assert(et.srcAttr != null) assert(et.dstAttr != null) val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) { diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala index 7ae4d7df43a2f5479c776f1c9d3235e767361173..b4311fa9f894eb17c4a77e32ae5c22026db60d9e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala @@ -16,10 +16,10 @@ import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap * @tparam ED the edge attribute type. */ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag]( - val srcIds: Array[Vid], - val dstIds: Array[Vid], + val srcIds: Array[VertexID], + val dstIds: Array[VertexID], val data: Array[ED], - val index: PrimitiveKeyOpenHashMap[Vid, Int]) extends Serializable { + val index: PrimitiveKeyOpenHashMap[VertexID, Int]) extends Serializable { /** * Reverse all the edges in this partition. @@ -101,8 +101,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = { val builder = new EdgePartitionBuilder[ED] var firstIter: Boolean = true - var currSrcId: Vid = nullValue[Vid] - var currDstId: Vid = nullValue[Vid] + var currSrcId: VertexID = nullValue[VertexID] + var currDstId: VertexID = nullValue[VertexID] var currAttr: ED = nullValue[ED] var i = 0 while (i < size) { @@ -136,7 +136,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) */ def innerJoin[ED2: ClassTag, ED3: ClassTag] (other: EdgePartition[ED2]) - (f: (Vid, Vid, ED, ED2) => ED3): EdgePartition[ED3] = { + (f: (VertexID, VertexID, ED, ED2) => ED3): EdgePartition[ED3] = { val builder = new EdgePartitionBuilder[ED3] var i = 0 var j = 0 @@ -193,14 +193,14 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * iterator is generated using an index scan, so it is efficient at skipping edges that don't * match srcIdPred. */ - def indexIterator(srcIdPred: Vid => Boolean): Iterator[Edge[ED]] = + def indexIterator(srcIdPred: VertexID => Boolean): Iterator[Edge[ED]] = index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator)) /** * Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The * cluster must start at position `index`. */ - private def clusterIterator(srcId: Vid, index: Int) = new Iterator[Edge[ED]] { + private def clusterIterator(srcId: VertexID, index: Int) = new Iterator[Edge[ED]] { private[this] val edge = new Edge[ED] private[this] var pos = index diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala index ae3f3a6d03145020b3f924e1897b8da5d2e0ddfe..56624ef60adc0bc3ad64550e4e588a0e10fb8e9a 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala @@ -13,22 +13,22 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I var edges = new PrimitiveVector[Edge[ED]](size) /** Add a new edge to the partition. */ - def add(src: Vid, dst: Vid, d: ED) { + def add(src: VertexID, dst: VertexID, d: ED) { edges += Edge(src, dst, d) } def toEdgePartition: EdgePartition[ED] = { val edgeArray = edges.trim().array Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering) - val srcIds = new Array[Vid](edgeArray.size) - val dstIds = new Array[Vid](edgeArray.size) + val srcIds = new Array[VertexID](edgeArray.size) + val dstIds = new Array[VertexID](edgeArray.size) val data = new Array[ED](edgeArray.size) - val index = new PrimitiveKeyOpenHashMap[Vid, Int] + val index = new PrimitiveKeyOpenHashMap[VertexID, Int] // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and // adding them to the index if (edgeArray.length > 0) { index.update(srcIds(0), 0) - var currSrcId: Vid = srcIds(0) + var currSrcId: VertexID = srcIds(0) var i = 0 while (i < edgeArray.size) { srcIds(i) = edgeArray(i).srcId diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala index 4d5eb240a91b7ee153461ec9446cf227578de81f..e95d79e3d63d63b4d8f3b769d8621ed527511fa9 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala @@ -25,7 +25,7 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag]( // allocating too many temporary Java objects. private val triplet = new EdgeTriplet[VD, ED] - private val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray) + private val vmap = new PrimitiveKeyOpenHashMap[VertexID, VD](vidToIndex, vertexArray) override def hasNext: Boolean = pos < edgePartition.size diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 2ce5404e9474483da58da030d5ace54f3af99075..6eb401b3b57aca1f326c47bf84df9c3db032c7f4 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -89,7 +89,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def statistics: Map[String, Any] = { // Get the total number of vertices after replication, used to compute the replication ratio. - def numReplicatedVertices(vid2pids: RDD[Array[Array[Vid]]]): Double = { + def numReplicatedVertices(vid2pids: RDD[Array[Array[VertexID]]]): Double = { vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble } @@ -157,7 +157,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) } - override def mapVertices[VD2: ClassTag](f: (Vid, VD) => VD2): Graph[VD2, ED] = { + override def mapVertices[VD2: ClassTag](f: (VertexID, VD) => VD2): Graph[VD2, ED] = { if (classTag[VD] equals classTag[VD2]) { // The map preserves type, so we can use incremental replication val newVerts = vertices.mapVertexPartitions(_.map(f)) @@ -208,7 +208,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def subgraph( epred: EdgeTriplet[VD, ED] => Boolean = x => true, - vpred: (Vid, VD) => Boolean = (a, b) => true): Graph[VD, ED] = { + vpred: (VertexID, VD) => Boolean = (a, b) => true): Graph[VD, ED] = { // Filter the vertices, reusing the partitioner and the index from this graph val newVerts = vertices.mapVertexPartitions(_.filter(vpred)) @@ -250,7 +250,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( ////////////////////////////////////////////////////////////////////////////////////////////////// override def mapReduceTriplets[A: ClassTag]( - mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)], + mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], reduceFunc: (A, A) => A, activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = { @@ -280,14 +280,14 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( val edgeIter = activeDirectionOpt match { case Some(EdgeDirection.Both) => if (activeFraction < 0.8) { - edgePartition.indexIterator(srcVid => vPart.isActive(srcVid)) + edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID)) .filter(e => vPart.isActive(e.dstId)) } else { edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId)) } case Some(EdgeDirection.Out) => if (activeFraction < 0.8) { - edgePartition.indexIterator(srcVid => vPart.isActive(srcVid)) + edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID)) } else { edgePartition.iterator.filter(e => vPart.isActive(e.srcId)) } @@ -318,7 +318,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } // end of mapReduceTriplets override def outerJoinVertices[U: ClassTag, VD2: ClassTag] - (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = { + (updates: RDD[(VertexID, U)])(updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] = { if (classTag[VD] equals classTag[VD2]) { // updateF preserves type, so we can use incremental replication val newVerts = vertices.leftJoin(updates)(updateF) @@ -360,7 +360,7 @@ object GraphImpl { } def apply[VD: ClassTag, ED: ClassTag]( - vertices: RDD[(Vid, VD)], + vertices: RDD[(VertexID, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD): GraphImpl[VD, ED] = { @@ -369,7 +369,7 @@ object GraphImpl { // Get the set of all vids val partitioner = Partitioner.defaultPartitioner(vertices) val vPartitioned = vertices.partitionBy(partitioner) - val vidsFromEdges = collectVidsFromEdges(edgeRDD, partitioner) + val vidsFromEdges = collectVertexIDsFromEdges(edgeRDD, partitioner) val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) => vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1) } @@ -381,7 +381,7 @@ object GraphImpl { /** * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges - * data structure (RDD[(Vid, Vid, ED)]). + * data structure (RDD[(VertexID, VertexID, ED)]). * * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value * pair: the key is the partition id, and the value is an EdgePartition object containing all the @@ -404,19 +404,19 @@ object GraphImpl { defaultVertexAttr: VD): GraphImpl[VD, ED] = { edges.cache() // Get the set of all vids - val vids = collectVidsFromEdges(edges, new HashPartitioner(edges.partitions.size)) + val vids = collectVertexIDsFromEdges(edges, new HashPartitioner(edges.partitions.size)) // Create the VertexRDD. val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr)) new GraphImpl(vertices, edges) } /** Collects all vids mentioned in edges and partitions them by partitioner. */ - private def collectVidsFromEdges( + private def collectVertexIDsFromEdges( edges: EdgeRDD[_], - partitioner: Partitioner): RDD[(Vid, Int)] = { + partitioner: Partitioner): RDD[(VertexID, Int)] = { // TODO: Consider doing map side distinct before shuffle. - new ShuffledRDD[Vid, Int, (Vid, Int)]( - edges.collectVids.map(vid => (vid, 0)), partitioner) - .setSerializer(classOf[VidMsgSerializer].getName) + new ShuffledRDD[VertexID, Int, (VertexID, Int)]( + edges.collectVertexIDs.map(vid => (vid, 0)), partitioner) + .setSerializer(classOf[VertexIDMsgSerializer].getName) } } // end of object GraphImpl diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala index bf033945dee249512e0c9ad910b8be3a7f95fd0f..2d03f75a28a25750956c6d3a332860f5dbf3ea72 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala @@ -3,15 +3,15 @@ package org.apache.spark.graph.impl import scala.reflect.{classTag, ClassTag} import org.apache.spark.Partitioner -import org.apache.spark.graph.{Pid, Vid} +import org.apache.spark.graph.{Pid, VertexID} import org.apache.spark.rdd.{ShuffledRDD, RDD} class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T]( @transient var partition: Pid, - var vid: Vid, + var vid: VertexID, var data: T) - extends Product2[Pid, (Vid, T)] with Serializable { + extends Product2[Pid, (VertexID, T)] with Serializable { override def _1 = partition @@ -41,7 +41,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) { def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = { - val rdd = new ShuffledRDD[Pid, (Vid, T), VertexBroadcastMsg[T]](self, partitioner) + val rdd = new ShuffledRDD[Pid, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner) // Set a custom serializer if the data is of int or double type. if (classTag[T] == ClassTag.Int) { @@ -77,8 +77,8 @@ object MsgRDDFunctions { new VertexBroadcastMsgRDDFunctions(rdd) } - def partitionForAggregation[T: ClassTag](msgs: RDD[(Vid, T)], partitioner: Partitioner) = { - val rdd = new ShuffledRDD[Vid, T, (Vid, T)](msgs, partitioner) + def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexID, T)], partitioner: Partitioner) = { + val rdd = new ShuffledRDD[VertexID, T, (VertexID, T)](msgs, partitioner) // Set a custom serializer if the data is of int or double type. if (classTag[T] == ClassTag.Int) { diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala index 970acfed2742514fc5abf98a5d331467d7dace50..9d2d242ffa8d9078d6351434930bacb4c5393cad 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala @@ -31,9 +31,9 @@ class ReplicatedVertexView[VD: ClassTag]( * vids from both the source and destination of edges. It must always include both source and * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this. */ - private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match { + private val localVertexIDMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match { case Some(prevView) => - prevView.localVidMap + prevView.localVertexIDMap case None => edges.partitionsRDD.mapPartitions(_.map { case (pid, epart) => @@ -43,7 +43,7 @@ class ReplicatedVertexView[VD: ClassTag]( vidToIndex.add(e.dstId) } (pid, vidToIndex) - }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVidMap") + }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap") } private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true) @@ -104,8 +104,8 @@ class ReplicatedVertexView[VD: ClassTag]( case None => // Within each edge partition, place the shipped vertex attributes into the correct - // locations specified in localVidMap - localVidMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) => + // locations specified in localVertexIDMap + localVertexIDMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) => val (pid, vidToIndex) = mapIter.next() assert(!mapIter.hasNext) // Populate the vertex array using the vidToIndex map @@ -128,15 +128,15 @@ class ReplicatedVertexView[VD: ClassTag]( object ReplicatedVertexView { protected def buildBuffer[VD: ClassTag]( - pid2vidIter: Iterator[Array[Array[Vid]]], + pid2vidIter: Iterator[Array[Array[VertexID]]], vertexPartIter: Iterator[VertexPartition[VD]]) = { - val pid2vid: Array[Array[Vid]] = pid2vidIter.next() + val pid2vid: Array[Array[VertexID]] = pid2vidIter.next() val vertexPart: VertexPartition[VD] = vertexPartIter.next() Iterator.tabulate(pid2vid.size) { pid => val vidsCandidate = pid2vid(pid) val size = vidsCandidate.length - val vids = new PrimitiveVector[Vid](pid2vid(pid).size) + val vids = new PrimitiveVector[VertexID](pid2vid(pid).size) val attrs = new PrimitiveVector[VD](pid2vid(pid).size) var i = 0 while (i < size) { @@ -152,16 +152,16 @@ object ReplicatedVertexView { } protected def buildActiveBuffer( - pid2vidIter: Iterator[Array[Array[Vid]]], + pid2vidIter: Iterator[Array[Array[VertexID]]], activePartIter: Iterator[VertexPartition[_]]) - : Iterator[(Int, Array[Vid])] = { - val pid2vid: Array[Array[Vid]] = pid2vidIter.next() + : Iterator[(Int, Array[VertexID])] = { + val pid2vid: Array[Array[VertexID]] = pid2vidIter.next() val activePart: VertexPartition[_] = activePartIter.next() Iterator.tabulate(pid2vid.size) { pid => val vidsCandidate = pid2vid(pid) val size = vidsCandidate.length - val actives = new PrimitiveVector[Vid](vidsCandidate.size) + val actives = new PrimitiveVector[VertexID](vidsCandidate.size) var i = 0 while (i < size) { val vid = vidsCandidate(i) @@ -175,7 +175,8 @@ object ReplicatedVertexView { } } -class VertexAttributeBlock[VD: ClassTag](val vids: Array[Vid], val attrs: Array[VD]) +class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexID], val attrs: Array[VD]) extends Serializable { - def iterator: Iterator[(Vid, VD)] = (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) } + def iterator: Iterator[(VertexID, VD)] = + (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala index b6cd048b33bb1d976297c1523f82345d19a2e985..9e6f78197e7f912847e27e633ef7348f0a7a9bed 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala @@ -14,12 +14,12 @@ import org.apache.spark.util.collection.PrimitiveVector */ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { - val bothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(true, true) - val srcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(true, false) - val dstAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(false, true) - val noAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(false, false) + val bothAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(true, true) + val srcAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(true, false) + val dstAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(false, true) + val noAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(false, false) - def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = + def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = (includeSrcAttr, includeDstAttr) match { case (true, true) => bothAttrs case (true, false) => srcAttrOnly @@ -28,9 +28,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { } private def createPid2Vid( - includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = { + includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = { // Determine which vertices each edge partition needs by creating a mapping from vid to pid. - val vid2pid: RDD[(Vid, Pid)] = edges.partitionsRDD.mapPartitions { iter => + val vid2pid: RDD[(VertexID, Pid)] = edges.partitionsRDD.mapPartitions { iter => val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next() val numEdges = edgePartition.size val vSet = new VertexSet @@ -53,7 +53,7 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { val numPartitions = vertices.partitions.size vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter => - val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid]) + val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexID]) for ((vid, pid) <- iter) { pid2vid(pid) += vid } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala index dcf619fa85e8ab2b622e168c3b12ef45a10fd9ca..a3b0ea7689fc71fcf29bde3b318183cf7b98d42e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala @@ -7,12 +7,12 @@ import org.apache.spark.SparkConf import org.apache.spark.graph._ import org.apache.spark.serializer._ -class VidMsgSerializer(conf: SparkConf) extends Serializer { +class VertexIDMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { - val msg = t.asInstanceOf[(Vid, _)] + val msg = t.asInstanceOf[(VertexID, _)] writeVarLong(msg._1, optimizePositive = false) this } @@ -101,7 +101,7 @@ class IntAggMsgSerializer(conf: SparkConf) extends Serializer { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { - val msg = t.asInstanceOf[(Vid, Int)] + val msg = t.asInstanceOf[(VertexID, Int)] writeVarLong(msg._1, optimizePositive = false) writeUnsignedVarInt(msg._2) this @@ -124,7 +124,7 @@ class LongAggMsgSerializer(conf: SparkConf) extends Serializer { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { - val msg = t.asInstanceOf[(Vid, Long)] + val msg = t.asInstanceOf[(VertexID, Long)] writeVarLong(msg._1, optimizePositive = false) writeVarLong(msg._2, optimizePositive = true) this @@ -147,7 +147,7 @@ class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { - val msg = t.asInstanceOf[(Vid, Double)] + val msg = t.asInstanceOf[(VertexID, Double)] writeVarLong(msg._1, optimizePositive = false) writeDouble(msg._2) this diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala index 7048a40f42364631e9d16907fab20f8ddcbd0ca6..91244daa54a57bbe9b7bfba421e0d8e19e67e3a2 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala @@ -10,18 +10,18 @@ import org.apache.spark.graph._ private[graph] object VertexPartition { - def apply[VD: ClassTag](iter: Iterator[(Vid, VD)]): VertexPartition[VD] = { - val map = new PrimitiveKeyOpenHashMap[Vid, VD] + def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)]): VertexPartition[VD] = { + val map = new PrimitiveKeyOpenHashMap[VertexID, VD] iter.foreach { case (k, v) => map(k) = v } new VertexPartition(map.keySet, map._values, map.keySet.getBitSet) } - def apply[VD: ClassTag](iter: Iterator[(Vid, VD)], mergeFunc: (VD, VD) => VD) + def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)], mergeFunc: (VD, VD) => VD) : VertexPartition[VD] = { - val map = new PrimitiveKeyOpenHashMap[Vid, VD] + val map = new PrimitiveKeyOpenHashMap[VertexID, VD] iter.foreach { case (k, v) => map.setMerge(k, v, mergeFunc) } @@ -44,15 +44,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( def size: Int = mask.cardinality() /** Return the vertex attribute for the given vertex ID. */ - def apply(vid: Vid): VD = values(index.getPos(vid)) + def apply(vid: VertexID): VD = values(index.getPos(vid)) - def isDefined(vid: Vid): Boolean = { + def isDefined(vid: VertexID): Boolean = { val pos = index.getPos(vid) pos >= 0 && mask.get(pos) } /** Look up vid in activeSet, throwing an exception if it is None. */ - def isActive(vid: Vid): Boolean = { + def isActive(vid: VertexID): Boolean = { activeSet.get.contains(vid) } @@ -72,7 +72,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * each of the entries in the original VertexRDD. The resulting * VertexPartition retains the same index. */ - def map[VD2: ClassTag](f: (Vid, VD) => VD2): VertexPartition[VD2] = { + def map[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexPartition[VD2] = { // Construct a view of the map transformation val newValues = new Array[VD2](capacity) var i = mask.nextSetBit(0) @@ -92,7 +92,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * RDD can be easily joined with the original vertex-set. Furthermore, the filter only * modifies the bitmap index and so no new values are allocated. */ - def filter(pred: (Vid, VD) => Boolean): VertexPartition[VD] = { + def filter(pred: (VertexID, VD) => Boolean): VertexPartition[VD] = { // Allocate the array to store the results into val newMask = new BitSet(capacity) // Iterate over the active bits in the old mask and evaluate the predicate @@ -130,7 +130,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( /** Left outer join another VertexPartition. */ def leftJoin[VD2: ClassTag, VD3: ClassTag] (other: VertexPartition[VD2]) - (f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { + (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { if (index != other.index) { logWarning("Joining two VertexPartitions with different indexes is slow.") leftJoin(createUsingIndex(other.iterator))(f) @@ -149,14 +149,14 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( /** Left outer join another iterator of messages. */ def leftJoin[VD2: ClassTag, VD3: ClassTag] - (other: Iterator[(Vid, VD2)]) - (f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { + (other: Iterator[(VertexID, VD2)]) + (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { leftJoin(createUsingIndex(other))(f) } /** Inner join another VertexPartition. */ def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U]) - (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = { + (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = { if (index != other.index) { logWarning("Joining two VertexPartitions with different indexes is slow.") innerJoin(createUsingIndex(other.iterator))(f) @@ -176,15 +176,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * Inner join an iterator of messages. */ def innerJoin[U: ClassTag, VD2: ClassTag] - (iter: Iterator[Product2[Vid, U]]) - (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = { + (iter: Iterator[Product2[VertexID, U]]) + (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = { innerJoin(createUsingIndex(iter))(f) } /** * Similar effect as aggregateUsingIndex((a, b) => a) */ - def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[Vid, VD2]]) + def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexID, VD2]]) : VertexPartition[VD2] = { val newMask = new BitSet(capacity) val newValues = new Array[VD2](capacity) @@ -202,7 +202,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in * the partition, hidden by the bitmask. */ - def innerJoinKeepLeft(iter: Iterator[Product2[Vid, VD]]): VertexPartition[VD] = { + def innerJoinKeepLeft(iter: Iterator[Product2[VertexID, VD]]): VertexPartition[VD] = { val newMask = new BitSet(capacity) val newValues = new Array[VD](capacity) System.arraycopy(values, 0, newValues, 0, newValues.length) @@ -217,8 +217,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( } def aggregateUsingIndex[VD2: ClassTag]( - iter: Iterator[Product2[Vid, VD2]], reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = - { + iter: Iterator[Product2[VertexID, VD2]], + reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = { val newMask = new BitSet(capacity) val newValues = new Array[VD2](capacity) iter.foreach { product => @@ -237,7 +237,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( new VertexPartition[VD2](index, newValues, newMask) } - def replaceActives(iter: Iterator[Vid]): VertexPartition[VD] = { + def replaceActives(iter: Iterator[VertexID]): VertexPartition[VD] = { val newActiveSet = new VertexSet iter.foreach(newActiveSet.add(_)) new VertexPartition(index, values, mask, Some(newActiveSet)) @@ -247,7 +247,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( * Construct a new VertexPartition whose index contains only the vertices in the mask. */ def reindex(): VertexPartition[VD] = { - val hashMap = new PrimitiveKeyOpenHashMap[Vid, VD] + val hashMap = new PrimitiveKeyOpenHashMap[VertexID, VD] val arbitraryMerge = (a: VD, b: VD) => a for ((k, v) <- this.iterator) { hashMap.setMerge(k, v, arbitraryMerge) @@ -255,7 +255,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet) } - def iterator: Iterator[(Vid, VD)] = mask.iterator.map(ind => (index.getValue(ind), values(ind))) + def iterator: Iterator[(VertexID, VD)] = + mask.iterator.map(ind => (index.getValue(ind), values(ind))) - def vidIterator: Iterator[Vid] = mask.iterator.map(ind => index.getValue(ind)) + def vidIterator: Iterator[VertexID] = mask.iterator.map(ind => index.getValue(ind)) } diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala index 655ae53bf8bc53f681ca04d2f1f3bf75f488dfe5..823d47c359b099f5390f6e98031d804220d6eea8 100644 --- a/graph/src/main/scala/org/apache/spark/graph/package.scala +++ b/graph/src/main/scala/org/apache/spark/graph/package.scala @@ -5,15 +5,15 @@ import org.apache.spark.util.collection.OpenHashSet package object graph { - type Vid = Long + type VertexID = Long // TODO: Consider using Char. type Pid = Int - type VertexSet = OpenHashSet[Vid] + type VertexSet = OpenHashSet[VertexID] // type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap - type VertexIdToIndexMap = OpenHashSet[Vid] + type VertexIdToIndexMap = OpenHashSet[VertexID] /** * Return the default null-like value for a data type T. diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala index d61f358bb08bd39ef8b8f9e07003d41905859134..51f45cb8922b35557e2fff101df9825ba990eb17 100644 --- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala +++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala @@ -70,7 +70,7 @@ object GraphGenerators { val sigma = 1.3 //val vertsAndEdges = (0 until numVertices).flatMap { src => { - val vertices: RDD[(Vid, Int)] = sc.parallelize(0 until numVertices).map{ + val vertices: RDD[(VertexID, Int)] = sc.parallelize(0 until numVertices).map{ src => (src, sampleLogNormal(mu, sigma, numVertices)) } @@ -92,11 +92,11 @@ object GraphGenerators { } - def generateRandomEdges(src: Int, numEdges: Int, maxVid: Int): Array[Edge[Int]] = { + def generateRandomEdges(src: Int, numEdges: Int, maxVertexID: Int): Array[Edge[Int]] = { val rand = new Random() var dsts: Set[Int] = Set() while (dsts.size < numEdges) { - val nextDst = rand.nextInt(maxVid) + val nextDst = rand.nextInt(maxVertexID) if (nextDst != src) { dsts += nextDst } @@ -251,9 +251,9 @@ object GraphGenerators { */ def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = { // Convert row column address into vertex ids (row major order) - def sub2ind(r: Int, c: Int): Vid = r * cols + c + def sub2ind(r: Int, c: Int): VertexID = r * cols + c - val vertices: RDD[(Vid, (Int,Int))] = + val vertices: RDD[(VertexID, (Int,Int))] = sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) ) val edges: RDD[Edge[Double]] = vertices.flatMap{ case (vid, (r,c)) => @@ -273,7 +273,7 @@ object GraphGenerators { * being the center vertex. */ def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = { - val edges: RDD[(Vid, Vid)] = sc.parallelize(1 until nverts).map(vid => (vid, 0)) + val edges: RDD[(VertexID, VertexID)] = sc.parallelize(1 until nverts).map(vid => (vid, 0)) Graph.fromEdgeTuples(edges, 1) } // end of starGraph diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala index 9e9213631ff63022041a3f3adc291f8ce3782460..132e6be24a04a0adb2c272a2b9447896b32897c7 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala @@ -11,7 +11,8 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { test("aggregateNeighbors") { withSpark { sc => val n = 3 - val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1) + val star = + Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID))), 1) val indegrees = star.aggregateNeighbors( (vid, edge) => Some(1), @@ -26,21 +27,22 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { assert(outdegrees.collect().toSet === Set((0, n))) val noVertexValues = star.aggregateNeighbors[Int]( - (vid: Vid, edge: EdgeTriplet[Int, Int]) => None, + (vid: VertexID, edge: EdgeTriplet[Int, Int]) => None, (a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"), EdgeDirection.In) - assert(noVertexValues.collect().toSet === Set.empty[(Vid, Int)]) + assert(noVertexValues.collect().toSet === Set.empty[(VertexID, Int)]) } } test("joinVertices") { withSpark { sc => - val vertices = sc.parallelize(Seq[(Vid, String)]((1, "one"), (2, "two"), (3, "three")), 2) + val vertices = + sc.parallelize(Seq[(VertexID, String)]((1, "one"), (2, "two"), (3, "three")), 2) val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo")))) val g: Graph[String, String] = Graph(vertices, edges) - val tbl = sc.parallelize(Seq[(Vid, Int)]((1, 10), (2, 20))) - val g1 = g.joinVertices(tbl) { (vid: Vid, attr: String, u: Int) => attr + u } + val tbl = sc.parallelize(Seq[(VertexID, Int)]((1, 10), (2, 20))) + val g1 = g.joinVertices(tbl) { (vid: VertexID, attr: String, u: Int) => attr + u } val v = g1.vertices.collect().toSet assert(v === Set((1, "one10"), (2, "two20"), (3, "three"))) @@ -67,7 +69,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { test ("filter") { withSpark { sc => val n = 5 - val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x))) + val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x))) val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) val graph: Graph[Int, Int] = Graph(vertices, edges) val filteredGraph = graph.filter( @@ -75,7 +77,7 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { val degrees: VertexRDD[Int] = graph.outDegrees graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)} }, - vpred = (vid: Vid, deg:Int) => deg > 0 + vpred = (vid: VertexID, deg:Int) => deg > 0 ) val v = filteredGraph.vertices.collect().toSet diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index e6c19dbc4035867ac0f967cf7b4347fddafaf803..41f3a8311d5a8bf1c9b3aa3ae56e865a888e2beb 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -9,7 +9,7 @@ import org.apache.spark.rdd._ class GraphSuite extends FunSuite with LocalSparkContext { def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = { - Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v") + Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v") } test("Graph.fromEdgeTuples") { @@ -39,7 +39,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { withSpark { sc => val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L) val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) } - val vertices: RDD[(Vid, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true))) + val vertices: RDD[(VertexID, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true))) val graph = Graph(vertices, edges, false) assert( graph.edges.count() === rawEdges.size ) // Vertices not explicitly provided but referenced by edges should be created automatically @@ -56,7 +56,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val n = 5 val star = starGraph(sc, n) assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet === - (1 to n).map(x => (0: Vid, x: Vid, "v", "v")).toSet) + (1 to n).map(x => (0: VertexID, x: VertexID, "v", "v")).toSet) } } @@ -92,7 +92,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val p = 100 val verts = 1 to n val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x => - verts.filter(y => y % x == 0).map(y => (x: Vid, y: Vid))), p), 0) + verts.filter(y => y % x == 0).map(y => (x: VertexID, y: VertexID))), p), 0) assert(graph.edges.partitions.length === p) val partitionedGraph = graph.partitionBy(EdgePartition2D) assert(graph.edges.partitions.length === p) @@ -118,10 +118,10 @@ class GraphSuite extends FunSuite with LocalSparkContext { val star = starGraph(sc, n) // mapVertices preserving type val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2") - assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: Vid, "v2")).toSet) + assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, "v2")).toSet) // mapVertices changing type val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length) - assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: Vid, 1)).toSet) + assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, 1)).toSet) } } @@ -150,7 +150,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { withSpark { sc => val n = 5 val star = starGraph(sc, n) - assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: Vid, 1)).toSet) + assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexID, 1)).toSet) } } @@ -173,7 +173,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { test("mask") { withSpark { sc => val n = 5 - val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x))) + val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x))) val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x))) val graph: Graph[Int, Int] = Graph(vertices, edges) @@ -199,7 +199,8 @@ class GraphSuite extends FunSuite with LocalSparkContext { val n = 5 val star = starGraph(sc, n) val doubleStar = Graph.fromEdgeTuples( - sc.parallelize((1 to n).flatMap(x => List((0: Vid, x: Vid), (0: Vid, x: Vid))), 1), "v") + sc.parallelize((1 to n).flatMap(x => + List((0: VertexID, x: VertexID), (0: VertexID, x: VertexID))), 1), "v") val star2 = doubleStar.groupEdges { (a, b) => a} assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) === star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int])) @@ -218,7 +219,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet) // activeSetOpt - val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: Vid, y: Vid) + val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexID, y: VertexID) val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0) val vids = complete.mapVertices((vid, attr) => vid).cache() val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 } @@ -229,11 +230,11 @@ class GraphSuite extends FunSuite with LocalSparkContext { } Iterator((et.srcId, 1)) }, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet - assert(numEvenNeighbors === (1 to n).map(x => (x: Vid, n / 2)).toSet) + assert(numEvenNeighbors === (1 to n).map(x => (x: VertexID, n / 2)).toSet) // outerJoinVertices followed by mapReduceTriplets(activeSetOpt) - val ring = Graph.fromEdgeTuples(sc.parallelize((0 until n).map(x => (x: Vid, (x+1) % n: Vid)), 3), 0) - .mapVertices((vid, attr) => vid).cache() + val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexID, (x+1) % n: VertexID)), 3) + val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache() val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_) val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) } val numOddNeighbors = changedGraph.mapReduceTriplets(et => { @@ -243,7 +244,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { } Iterator((et.dstId, 1)) }, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet - assert(numOddNeighbors === (2 to n by 2).map(x => (x: Vid, 1)).toSet) + assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexID, 1)).toSet) } } @@ -258,7 +259,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets( et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)), (a: Int, b: Int) => a + b).collect.toSet - assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0))) + assert(neighborDegreeSums === Set((0: VertexID, n)) ++ (1 to n).map(x => (x: VertexID, 0))) // outerJoinVertices preserving type val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString } val newReverseStar = diff --git a/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala b/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala index 44182e85eeafec463c8693cca2c46b9550ccfbcc..de7e3872cad6f9aed5b6f617d523526039f557a2 100644 --- a/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala @@ -10,7 +10,8 @@ class PregelSuite extends FunSuite with LocalSparkContext { test("1 iteration") { withSpark { sc => val n = 5 - val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v") + val star = + Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v") val result = Pregel(star, 0)( (vid, attr, msg) => attr, et => Iterator.empty, @@ -23,11 +24,12 @@ class PregelSuite extends FunSuite with LocalSparkContext { withSpark { sc => val n = 5 val chain = Graph.fromEdgeTuples( - sc.parallelize((1 until n).map(x => (x: Vid, x + 1: Vid)), 3), + sc.parallelize((1 until n).map(x => (x: VertexID, x + 1: VertexID)), 3), 0).cache() - assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: Vid, 0)).toSet) + assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexID, 0)).toSet) val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 } - assert(chainWithSeed.vertices.collect.toSet === Set((1: Vid, 1)) ++ (2 to n).map(x => (x: Vid, 0)).toSet) + assert(chainWithSeed.vertices.collect.toSet === + Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet) val result = Pregel(chainWithSeed, 0)( (vid, attr, msg) => math.max(msg, attr), et => Iterator((et.dstId, et.srcAttr)), diff --git a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala index 4014cbe440d8de02451fef4eb34a6e10543b4914..2864ffd1ca31edaabcbc55a2a6b7aff196d8a723 100644 --- a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala @@ -82,7 +82,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext { test("IntAggMsgSerializer") { val conf = new SparkConf(false) - val outMsg = (4: Vid, 5) + val outMsg = (4: VertexID, 5) val bout = new ByteArrayOutputStream val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) @@ -90,8 +90,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext { bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin) - val inMsg1: (Vid, Int) = inStrm.readObject() - val inMsg2: (Vid, Int) = inStrm.readObject() + val inMsg1: (VertexID, Int) = inStrm.readObject() + val inMsg2: (VertexID, Int) = inStrm.readObject() assert(outMsg === inMsg1) assert(outMsg === inMsg2) @@ -102,7 +102,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext { test("LongAggMsgSerializer") { val conf = new SparkConf(false) - val outMsg = (4: Vid, 1L << 32) + val outMsg = (4: VertexID, 1L << 32) val bout = new ByteArrayOutputStream val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) @@ -110,8 +110,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext { bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin) - val inMsg1: (Vid, Long) = inStrm.readObject() - val inMsg2: (Vid, Long) = inStrm.readObject() + val inMsg1: (VertexID, Long) = inStrm.readObject() + val inMsg2: (VertexID, Long) = inStrm.readObject() assert(outMsg === inMsg1) assert(outMsg === inMsg2) @@ -122,7 +122,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext { test("DoubleAggMsgSerializer") { val conf = new SparkConf(false) - val outMsg = (4: Vid, 5.0) + val outMsg = (4: VertexID, 5.0) val bout = new ByteArrayOutputStream val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout) outStrm.writeObject(outMsg) @@ -130,8 +130,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext { bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin) - val inMsg1: (Vid, Double) = inStrm.readObject() - val inMsg2: (Vid, Double) = inStrm.readObject() + val inMsg1: (VertexID, Double) = inStrm.readObject() + val inMsg2: (VertexID, Double) = inStrm.readObject() assert(outMsg === inMsg1) assert(outMsg === inMsg2) diff --git a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala index f951fd7a82741fbb651d8fad8f39d2e0c489e426..fd0beee2f640848364911015818ba62574a41038 100644 --- a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala @@ -62,7 +62,7 @@ class EdgePartitionSuite extends FunSuite { test("innerJoin") { def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { val builder = new EdgePartitionBuilder[A] - for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) } + for ((src, dst, attr) <- xs) { builder.add(src: VertexID, dst: VertexID, attr) } builder.toEdgePartition } val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))