From 43e1bdc80c2b19533596df74fd7b97a2d7b84bb6 Mon Sep 17 00:00:00 2001
From: Ankur Dave <ankurdave@gmail.com>
Date: Thu, 9 Jan 2014 13:59:48 -0800
Subject: [PATCH] Pid -> PartitionID

---
 .../scala/org/apache/spark/graph/EdgeRDD.scala   |  8 ++++----
 .../scala/org/apache/spark/graph/Graph.scala     |  5 +++--
 .../apache/spark/graph/PartitionStrategy.scala   | 16 ++++++++--------
 .../org/apache/spark/graph/impl/GraphImpl.scala  |  8 ++++----
 .../spark/graph/impl/MessageToPartition.scala    | 14 +++++++-------
 .../spark/graph/impl/ReplicatedVertexView.scala  | 14 +++++++-------
 .../apache/spark/graph/impl/RoutingTable.scala   |  4 ++--
 .../scala/org/apache/spark/graph/package.scala   |  2 +-
 8 files changed, 36 insertions(+), 35 deletions(-)

diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
index fd93359352..78821bf568 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
@@ -9,7 +9,7 @@ import org.apache.spark.storage.StorageLevel
 
 
 class EdgeRDD[@specialized ED: ClassTag](
-    val partitionsRDD: RDD[(Pid, EdgePartition[ED])])
+    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
   extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
 
   partitionsRDD.setName("EdgeRDD")
@@ -17,7 +17,7 @@ class EdgeRDD[@specialized ED: ClassTag](
   override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
 
   /**
-   * If partitionsRDD already has a partitioner, use it. Otherwise assume that the Pids in
+   * If partitionsRDD already has a partitioner, use it. Otherwise assume that the PartitionIDs in
    * partitionsRDD correspond to the actual partitions and create a new partitioner that allows
    * co-partitioning with partitionsRDD.
    */
@@ -25,7 +25,7 @@ class EdgeRDD[@specialized ED: ClassTag](
     partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
 
   override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
-    firstParent[(Pid, EdgePartition[ED])].iterator(part, context).next._2.iterator
+    firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
   }
 
   override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
@@ -44,7 +44,7 @@ class EdgeRDD[@specialized ED: ClassTag](
   /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
   override def cache(): EdgeRDD[ED] = persist()
 
-  def mapEdgePartitions[ED2: ClassTag](f: (Pid, EdgePartition[ED]) => EdgePartition[ED2])
+  def mapEdgePartitions[ED2: ClassTag](f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2])
     : EdgeRDD[ED2] = {
 //       iter => iter.map { case (pid, ep) => (pid, f(ep)) }
     new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index dd0799142e..86282e607e 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -169,7 +169,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
    * @tparam ED2 the new edge data type
    *
    */
-  def mapEdges[ED2: ClassTag](map: (Pid, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
+  def mapEdges[ED2: ClassTag](
+      map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
 
   /**
    * Construct a new graph where the value of each edge is
@@ -220,7 +221,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
    *
    */
   def mapTriplets[ED2: ClassTag](
-      map: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]):
+      map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]):
     Graph[VD, ED2]
 
   /**
diff --git a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
index c01b4b9439..bc05fb812c 100644
--- a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
@@ -2,7 +2,7 @@ package org.apache.spark.graph
 
 
 sealed trait PartitionStrategy extends Serializable {
-  def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid
+  def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
 }
 
 
@@ -51,18 +51,18 @@ sealed trait PartitionStrategy extends Serializable {
  *
  */
 case object EdgePartition2D extends PartitionStrategy {
-  override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = {
-    val ceilSqrtNumParts: Pid = math.ceil(math.sqrt(numParts)).toInt
+  override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+    val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
     val mixingPrime: VertexID = 1125899906842597L
-    val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
-    val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
+    val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
+    val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
     (col * ceilSqrtNumParts + row) % numParts
   }
 }
 
 
 case object EdgePartition1D extends PartitionStrategy {
-  override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = {
+  override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
     val mixingPrime: VertexID = 1125899906842597L
     (math.abs(src) * mixingPrime).toInt % numParts
   }
@@ -74,7 +74,7 @@ case object EdgePartition1D extends PartitionStrategy {
  * random vertex cut.
  */
 case object RandomVertexCut extends PartitionStrategy {
-  override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = {
+  override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
     math.abs((src, dst).hashCode()) % numParts
   }
 }
@@ -86,7 +86,7 @@ case object RandomVertexCut extends PartitionStrategy {
  * will end up on the same partition.
  */
 case object CanonicalRandomVertexCut extends PartitionStrategy {
-  override def getPartition(src: VertexID, dst: VertexID, numParts: Pid): Pid = {
+  override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
     val lower = math.min(src, dst)
     val higher = math.max(src, dst)
     math.abs((lower, higher).hashCode()) % numParts
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 6eb401b3b5..8f42e7d592 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -69,7 +69,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     val numPartitions = edges.partitions.size
     val edTag = classTag[ED]
     val newEdges = new EdgeRDD(edges.map { e =>
-      val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
+      val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
 
       // Should we be using 3-tuple or an optimized class
       new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
@@ -173,13 +173,13 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
   }
 
   override def mapEdges[ED2: ClassTag](
-      f: (Pid, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
+      f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
     val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
     new GraphImpl(vertices, newETable , routingTable, replicatedVertexView)
   }
 
   override def mapTriplets[ED2: ClassTag](
-      f: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
+      f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
     // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
     // manifest from GraphImpl (which would require serializing GraphImpl).
     val vdTag = classTag[VD]
@@ -354,7 +354,7 @@ object GraphImpl {
   }
 
   def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
-      edgePartitions: RDD[(Pid, EdgePartition[ED])],
+      edgePartitions: RDD[(PartitionID, EdgePartition[ED])],
       defaultVertexAttr: VD): GraphImpl[VD, ED] = {
     fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
   }
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
index 2d03f75a28..b2fa728482 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
@@ -3,15 +3,15 @@ package org.apache.spark.graph.impl
 import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.Partitioner
-import org.apache.spark.graph.{Pid, VertexID}
+import org.apache.spark.graph.{PartitionID, VertexID}
 import org.apache.spark.rdd.{ShuffledRDD, RDD}
 
 
 class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
-    @transient var partition: Pid,
+    @transient var partition: PartitionID,
     var vid: VertexID,
     var data: T)
-  extends Product2[Pid, (VertexID, T)] with Serializable {
+  extends Product2[PartitionID, (VertexID, T)] with Serializable {
 
   override def _1 = partition
 
@@ -27,9 +27,9 @@ class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
  * @param data value to send
  */
 class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T](
-    @transient var partition: Pid,
+    @transient var partition: PartitionID,
     var data: T)
-  extends Product2[Pid, T] with Serializable {
+  extends Product2[PartitionID, T] with Serializable {
 
   override def _1 = partition
 
@@ -41,7 +41,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef
 
 class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) {
   def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
-    val rdd = new ShuffledRDD[Pid, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner)
+    val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner)
 
     // Set a custom serializer if the data is of int or double type.
     if (classTag[T] == ClassTag.Int) {
@@ -62,7 +62,7 @@ class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) {
    * Return a copy of the RDD partitioned using the specified partitioner.
    */
   def partitionBy(partitioner: Partitioner): RDD[MessageToPartition[T]] = {
-    new ShuffledRDD[Pid, T, MessageToPartition[T]](self, partitioner)
+    new ShuffledRDD[PartitionID, T, MessageToPartition[T]](self, partitioner)
   }
 
 }
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
index 9d2d242ffa..7d29861db1 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
@@ -46,12 +46,12 @@ class ReplicatedVertexView[VD: ClassTag](
       }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap")
   }
 
-  private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true)
-  private lazy val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(true, false)
-  private lazy val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(false, true)
-  private lazy val noAttrs: RDD[(Pid, VertexPartition[VD])] = create(false, false)
+  private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true)
+  private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(true, false)
+  private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(false, true)
+  private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(false, false)
 
-  def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = {
+  def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, VertexPartition[VD])] = {
     (includeSrc, includeDst) match {
       case (true, true) => bothAttrs
       case (true, false) => srcAttrOnly
@@ -63,7 +63,7 @@ class ReplicatedVertexView[VD: ClassTag](
   def get(
       includeSrc: Boolean,
       includeDst: Boolean,
-      actives: VertexRDD[_]): RDD[(Pid, VertexPartition[VD])] = {
+      actives: VertexRDD[_]): RDD[(PartitionID, VertexPartition[VD])] = {
     // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
     // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
     // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
@@ -81,7 +81,7 @@ class ReplicatedVertexView[VD: ClassTag](
   }
 
   private def create(includeSrc: Boolean, includeDst: Boolean)
-    : RDD[(Pid, VertexPartition[VD])] = {
+    : RDD[(PartitionID, VertexPartition[VD])] = {
     val vdTag = classTag[VD]
 
     // Ship vertex attributes to edge partitions according to vertexPlacement
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
index 9e6f78197e..96d9e9d7f8 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
@@ -30,8 +30,8 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
   private def createPid2Vid(
       includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = {
     // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
-    val vid2pid: RDD[(VertexID, Pid)] = edges.partitionsRDD.mapPartitions { iter =>
-      val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next()
+    val vid2pid: RDD[(VertexID, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
+      val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
       val numEdges = edgePartition.size
       val vSet = new VertexSet
       if (includeSrcAttr) {  // Add src vertices to the set.
diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala
index 823d47c359..b98a11b918 100644
--- a/graph/src/main/scala/org/apache/spark/graph/package.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/package.scala
@@ -8,7 +8,7 @@ package object graph {
   type VertexID = Long
 
   // TODO: Consider using Char.
-  type Pid = Int
+  type PartitionID = Int
 
   type VertexSet = OpenHashSet[VertexID]
 
-- 
GitLab