diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala index 6c396c3dbeb322cb150c7a059c775447b57a0c66..85463052bc361c3b128dfc9e4bb9684172b7fde1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala @@ -1,18 +1,18 @@ package org.apache.spark.graphx - /** * A single directed edge consisting of a source id, target id, * and the data associated with the edge. * * @tparam ED type of the edge attribute + * + * @param srcId The vertex id of the source vertex + * @param dstId The vertex id of the target vertex + * @param attr The attribute associated with the edge */ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( - /** The vertex id of the source vertex */ var srcId: VertexID = 0, - /** The vertex id of the target vertex. */ var dstId: VertexID = 0, - /** The attribute associated with the edge. */ var attr: ED = null.asInstanceOf[ED]) extends Serializable { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 66d5180020a8ccac38419a6887c88cee09b2fbc1..447ef555ca530e4c021d05dab1a1e4eee7d59095 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -209,12 +209,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * This function iterates until there are no remaining messages, or * for `maxIterations` iterations. * - * @tparam VD the vertex data type - * @tparam ED the edge data type * @tparam A the Pregel message type * - * @param graph the input graph. - * * @param initialMsg the message each vertex will receive at the on * the first iteration * diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index d4d71627e1d98ff5e8c837a56825f667e9f6dc08..ee95ead3ada9b337a5141662cb1c41286fd173de 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -15,6 +15,7 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap * @param index a clustered index on source vertex id * @tparam ED the edge attribute type. */ +private[graphx] class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag]( val srcIds: Array[VertexID], val dstIds: Array[VertexID], diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index fbc29409b59a9272356c887da4eef464b3bf7033..9d072f933503c7874818eb5408bc28725fd822f4 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -7,6 +7,7 @@ import org.apache.spark.graphx._ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap import org.apache.spark.util.collection.PrimitiveVector +private[graphx] class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) { var edges = new PrimitiveVector[Edge[ED]](size) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 97ca642f9b65b062454300791b370e736f2936e9..348490c1860f9778b8997bd3e7f6ddc955e2b8ed 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -15,7 +15,7 @@ import org.apache.spark.util.ClosureCleaner /** - * A Graph RDD that supports computation on graphs. + * A graph that supports computation on graphs. * * Graphs are represented using two classes of data: vertex-partitioned and * edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges` diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala index ad5daf8f6ae320db03e91ef3060ebb13a6729e0e..05508ff716eb10f89bf77d7578c0f81d69ac52f6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala @@ -7,6 +7,7 @@ import org.apache.spark.graphx.{PartitionID, VertexID} import org.apache.spark.rdd.{ShuffledRDD, RDD} +private[graphx] class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T]( @transient var partition: PartitionID, var vid: VertexID, @@ -26,6 +27,7 @@ class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T]( * @param partition index of the target partition. * @param data value to send */ +private[graphx] class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T]( @transient var partition: PartitionID, var data: T) @@ -39,6 +41,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef } +private[graphx] class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) { def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = { val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner) @@ -56,6 +59,7 @@ class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T } +private[graphx] class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) { /** @@ -68,6 +72,7 @@ class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) { } +private[graphx] object MsgRDDFunctions { implicit def rdd2PartitionRDDFunctions[T: ClassTag](rdd: RDD[MessageToPartition[T]]) = { new MsgRDDFunctions(rdd) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala index 0e2f5a9dd93cdbc1c3b21bb211f7e6d1ac943482..4ebe0b02671d931fdbf78b9f10cf068d91625135 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala @@ -138,7 +138,7 @@ class ReplicatedVertexView[VD: ClassTag]( } } -object ReplicatedVertexView { +private object ReplicatedVertexView { protected def buildBuffer[VD: ClassTag]( pid2vidIter: Iterator[Array[Array[VertexID]]], vertexPartIter: Iterator[VertexPartition[VD]]) = { @@ -187,6 +187,7 @@ object ReplicatedVertexView { } } +private[graphx] class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexID], val attrs: Array[VD]) extends Serializable { def iterator: Iterator[(VertexID, VD)] = diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala index 3bd8b24133244db108bd42e829c8d55d0cae3194..f342fd743790342361e34d8ca55ee5380345a86c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala @@ -12,6 +12,7 @@ import org.apache.spark.util.collection.PrimitiveVector * may be used multiple times in ReplicatedVertexView -- once to ship the vertex attributes and * (possibly) once to ship the active-set information. */ +private[impl] class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { val bothAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(true, true) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala index 1c3c87f08dcb954d3dd6b28c4db2e8acf159723d..cbd6318f33cdc2d8d3de77ce06f895ac83b5c244 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala @@ -7,6 +7,7 @@ import org.apache.spark.SparkConf import org.apache.spark.graphx._ import org.apache.spark.serializer._ +private[graphx] class VertexIDMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { @@ -27,6 +28,7 @@ class VertexIDMsgSerializer(conf: SparkConf) extends Serializer { } /** A special shuffle serializer for VertexBroadcastMessage[Int]. */ +private[graphx] class IntVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { @@ -50,6 +52,7 @@ class IntVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { } /** A special shuffle serializer for VertexBroadcastMessage[Long]. */ +private[graphx] class LongVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { @@ -73,6 +76,7 @@ class LongVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { } /** A special shuffle serializer for VertexBroadcastMessage[Double]. */ +private[graphx] class DoubleVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { @@ -96,6 +100,7 @@ class DoubleVertexBroadcastMsgSerializer(conf: SparkConf) extends Serializer { } /** A special shuffle serializer for AggregationMessage[Int]. */ +private[graphx] class IntAggMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { @@ -119,6 +124,7 @@ class IntAggMsgSerializer(conf: SparkConf) extends Serializer { } /** A special shuffle serializer for AggregationMessage[Long]. */ +private[graphx] class LongAggMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { @@ -142,6 +148,7 @@ class LongAggMsgSerializer(conf: SparkConf) extends Serializer { } /** A special shuffle serializer for AggregationMessage[Double]. */ +private[graphx] class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { @@ -168,6 +175,7 @@ class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer { // Helper classes to shorten the implementation of those special serializers. //////////////////////////////////////////////////////////////////////////////// +private[graphx] abstract class ShuffleSerializationStream(s: OutputStream) extends SerializationStream { // The implementation should override this one. def writeObject[T](t: T): SerializationStream @@ -281,6 +289,7 @@ abstract class ShuffleSerializationStream(s: OutputStream) extends Serialization override def close(): Unit = s.close() } +private[graphx] abstract class ShuffleDeserializationStream(s: InputStream) extends DeserializationStream { // The implementation should override this one. def readObject[T](): T @@ -371,7 +380,7 @@ abstract class ShuffleDeserializationStream(s: InputStream) extends Deserializat override def close(): Unit = s.close() } -sealed trait ShuffleSerializerInstance extends SerializerInstance { +private[graphx] sealed trait ShuffleSerializerInstance extends SerializerInstance { override def serialize[T](t: T): ByteBuffer = throw new UnsupportedOperationException diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala index a6bbf63888ba8762d8923441bd7dcd39d3e9aadd..cfc3281b6407e817f26c1573c49b2a48c1fde774 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/package.scala @@ -3,5 +3,5 @@ package org.apache.spark.graphx import org.apache.spark.util.collection.OpenHashSet package object impl { - type VertexIdToIndexMap = OpenHashSet[VertexID] + private[graphx] type VertexIdToIndexMap = OpenHashSet[VertexID] }