Skip to content
Snippets Groups Projects
Commit 60db25bd authored by Dan Crankshaw's avatar Dan Crankshaw
Browse files

Fixed merge conflicts.

parents 384befb2 1a06f707
No related branches found
No related tags found
No related merge requests found
Showing
with 1124 additions and 636 deletions
......@@ -40,6 +40,7 @@ if [ -f "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-dep
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/graph/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/classes"
DEPS_ASSEMBLY_JAR=`ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-deps.jar`
......
......@@ -157,6 +157,16 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
/** Return the value at the specified position. */
def getValue(pos: Int): T = _data(pos)
def iterator() = new Iterator[T] {
var pos = nextPos(0)
override def hasNext: Boolean = pos != INVALID_POS
override def next(): T = {
val tmp = getValue(pos)
pos = nextPos(pos+1)
tmp
}
}
/** Return the value at the specified position. */
def getValueSafe(pos: Int): T = {
assert(_bitset.get(pos))
......
......@@ -18,7 +18,7 @@ object Analytics extends Logging {
* Run PageRank for a fixed number of iterations returning a graph
* with vertex attributes containing the PageRank and edge
* attributes the normalized edge weight.
*
*
* The following PageRank fixed point is computed for each vertex.
*
* {{{
......@@ -35,7 +35,7 @@ object Analytics extends Logging {
* where `alpha` is the random reset probability (typically 0.15),
* `inNbrs[i]` is the set of neighbors whick link to `i` and
* `outDeg[j]` is the out degree of vertex `j`.
*
*
* Note that this is not the "normalized" PageRank and as a
* consequence pages that have no inlinks will have a PageRank of
* alpha.
......@@ -52,7 +52,7 @@ object Analytics extends Logging {
*
*/
def pagerank[VD: Manifest, ED: Manifest](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15):
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15):
Graph[Double, Double] = {
/**
......@@ -76,13 +76,13 @@ object Analytics extends Logging {
// version of Pregel
def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum
def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] =
Some(edge.srcAttr * edge.attr)
def sendMessage(edge: EdgeTriplet[Double, Double]) =
Array((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = 0.0
val initialMessage = 0.0
// Execute pregel for a fixed number of iterations.
// Execute pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter)(
vertexProgram, sendMessage, messageCombiner)
}
......@@ -107,7 +107,7 @@ object Analytics extends Logging {
* where `alpha` is the random reset probability (typically 0.15),
* `inNbrs[i]` is the set of neighbors whick link to `i` and
* `outDeg[j]` is the out degree of vertex `j`.
*
*
* Note that this is not the "normalized" PageRank and as a
* consequence pages that have no inlinks will have a PageRank of
* alpha.
......@@ -124,11 +124,11 @@ object Analytics extends Logging {
* PageRank and each edge containing the normalized weight.
*/
def deltaPagerank[VD: Manifest, ED: Manifest](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15):
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15):
Graph[Double, Double] = {
/**
* Initialize the pagerankGraph with each edge attribute
* Initialize the pagerankGraph with each edge attribute
* having weight 1/outDegree and each vertex with attribute 1.0.
*/
val pagerankGraph: Graph[(Double, Double), Double] = graph
......@@ -136,7 +136,7 @@ object Analytics extends Logging {
.outerJoinVertices(graph.outDegrees){
(vid, vdata, deg) => deg.getOrElse(0)
}
// Set the weight on the edges based on the degree
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initalPR, delta = 0)
.mapVertices( (id, attr) => (0.0, 0.0) )
......@@ -151,16 +151,16 @@ object Analytics extends Logging {
val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
}
def sendMessage(id: Vid, edge: EdgeTriplet[(Double, Double), Double]): Option[Double] = {
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
if (edge.srcAttr._2 > tol) {
Some(edge.srcAttr._2 * edge.attr)
} else { None }
}
Array((edge.dstId, edge.srcAttr._2 * edge.attr))
} else { Array.empty[(Vid, Double)] }
}
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = resetProb / (1.0 - resetProb)
// Execute a dynamic version of Pregel.
// Execute a dynamic version of Pregel.
Pregel(pagerankGraph, initialMessage)(
vertexProgram, sendMessage, messageCombiner)
.mapVertices( (vid, attr) => attr._1 )
......@@ -182,26 +182,28 @@ object Analytics extends Logging {
* @return a graph with vertex attributes containing the smallest
* vertex in each connected component
*/
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]):
def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]):
Graph[Vid, ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
def sendMessage(id: Vid, edge: EdgeTriplet[Vid, ED]): Option[Vid] = {
val thisAttr = edge.vertexAttr(id)
val otherAttr = edge.otherVertexAttr(id)
if(thisAttr < otherAttr) { Some(thisAttr) }
else { None }
def sendMessage(edge: EdgeTriplet[Vid, ED]) = {
if (edge.srcAttr < edge.dstAttr) {
Array((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
Array((edge.srcId, edge.dstAttr))
} else {
Array.empty[(Vid, Vid)]
}
}
val initialMessage = Long.MaxValue
Pregel(ccGraph, initialMessage, EdgeDirection.Both)(
Pregel(ccGraph, initialMessage)(
(id, attr, msg) => math.min(attr, msg),
sendMessage,
sendMessage,
(a,b) => math.min(a,b)
)
} // end of connectedComponents
} // end of connectedComponents
def main(args: Array[String]) = {
val host = args(0)
......@@ -213,7 +215,7 @@ object Analytics extends Logging {
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
def setLogLevels(level: org.apache.log4j.Level, loggers: TraversableOnce[String]) = {
loggers.map{
loggerName =>
......@@ -265,7 +267,7 @@ object Analytics extends Logging {
val sc = new SparkContext(host, "PageRank(" + fname + ")")
val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
val startTime = System.currentTimeMillis
......@@ -314,7 +316,7 @@ object Analytics extends Logging {
val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
//val graph = GraphLoader.textFile(sc, fname, a => 1.0F)
val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
val graph = GraphLoader.textFile(sc, fname, a => 1.0F,
minEdgePartitions = numEPart, minVertexPartitions = numVPart).cache()
val cc = Analytics.connectedComponents(graph)
//val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter)
......
package org.apache.spark.graph
import org.apache.spark.rdd.RDD
import org.apache.spark.util.ClosureCleaner
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
/**
* The Graph abstractly represents a graph with arbitrary objects
......@@ -16,21 +14,21 @@ import org.apache.spark.Logging
* operations return new graphs.
*
* @see GraphOps for additional graph member functions.
*
*
* @note The majority of the graph operations are implemented in
* `GraphOps`. All the convenience operations are defined in the
* `GraphOps` class which may be shared across multiple graph
* implementations.
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
* @tparam ED the edge attribute type
*/
abstract class Graph[VD: ClassManifest, ED: ClassManifest] extends Logging {
/**
* Get the vertices and their data.
*
* @note vertex ids are unique.
* @note vertex ids are unique.
* @return An RDD containing the vertices in this graph
*
* @see Vertex for the vertex type.
......@@ -74,6 +72,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] extends Logging {
*/
val triplets: RDD[EdgeTriplet[VD, ED]]
def persist(newLevel: StorageLevel): Graph[VD, ED]
/**
* Return a graph that is cached when first created. This is used to
* pin a graph in memory enabling multiple queries to reuse the same
......@@ -104,7 +107,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] extends Logging {
* @tparam VD2 the new vertex data type
*
* @example We might use this operation to change the vertex values
* from one type to another to initialize an algorithm.
* from one type to another to initialize an algorithm.
* {{{
* val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file")
* val root = 42
......@@ -194,7 +197,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] extends Logging {
* @return the subgraph containing only the vertices and edges that
* satisfy the predicates.
*/
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
......@@ -259,12 +262,12 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] extends Logging {
* @param reduceFunc the user defined reduce function which should
* be commutative and assosciative and is used to combine the output
* of the map phase.
*
*
* @example We can use this function to compute the inDegree of each
* vertex
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
* val inDeg: RDD[(Vid, Int)] =
* val inDeg: RDD[(Vid, Int)] =
* mapReduceTriplets[Int](et => Array((et.dst.id, 1)), _ + _)
* }}}
*
......@@ -273,12 +276,12 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] extends Logging {
* Graph API in that enables neighborhood level computation. For
* example this function can be used to count neighbors satisfying a
* predicate or implement PageRank.
*
*
*/
def mapReduceTriplets[A: ClassManifest](
mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
reduceFunc: (A, A) => A)
: VertexSetRDD[A]
: VertexSetRDD[A]
/**
......@@ -300,11 +303,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] extends Logging {
* @example This function is used to update the vertices with new
* values based on external data. For example we could add the out
* degree to each vertex record
*
*
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("webgraph")
* val outDeg: RDD[(Vid, Int)] = rawGraph.outDegrees()
* val graph = rawGraph.outerJoinVertices(outDeg) {
* val graph = rawGraph.outerJoinVertices(outDeg) {
* (vid, data, optDeg) => optDeg.getOrElse(0)
* }
* }}}
......@@ -341,7 +344,7 @@ object Graph {
* (i.e., the undirected degree).
*
* @param rawEdges the RDD containing the set of edges in the graph
*
*
* @return a graph with edge attributes containing the count of
* duplicate edges and vertex attributes containing the total degree
* of each vertex.
......@@ -372,10 +375,10 @@ object Graph {
rawEdges.map { case (s, t) => Edge(s, t, 1) }
}
// Determine unique vertices
/** @todo Should this reduceByKey operation be indexed? */
val vertices: RDD[(Vid, Int)] =
/** @todo Should this reduceByKey operation be indexed? */
val vertices: RDD[(Vid, Int)] =
edges.flatMap{ case Edge(s, t, cnt) => Array((s, 1), (t, 1)) }.reduceByKey(_ + _)
// Return graph
GraphImpl(vertices, edges, 0)
}
......@@ -396,7 +399,7 @@ object Graph {
*
*/
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid,VD)],
vertices: RDD[(Vid,VD)],
edges: RDD[Edge[ED]]): Graph[VD, ED] = {
val defaultAttr: VD = null.asInstanceOf[VD]
Graph(vertices, edges, defaultAttr, (a:VD,b:VD) => a)
......@@ -420,7 +423,7 @@ object Graph {
*
*/
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid,VD)],
vertices: RDD[(Vid,VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD,
mergeFunc: (VD, VD) => VD): Graph[VD, ED] = {
......
......@@ -2,7 +2,7 @@ package org.apache.spark.graph
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.graph.impl.{EdgePartition, MessageToPartition}
import org.apache.spark.graph.impl._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.util.collection.BitSet
......@@ -12,6 +12,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MutableTuple2[Object, Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
kryo.register(classOf[AggregationMsg[Object]])
kryo.register(classOf[(Vid, Object)])
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[BitSet])
......
......@@ -11,21 +11,21 @@ import org.apache.spark.util.ClosureCleaner
* the graph type and is implicitly constructed for each Graph object.
* All operations in `GraphOps` are expressed in terms of the
* efficient GraphX API.
*
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
* @tparam ED the edge attribute type
*
*/
class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* Compute the number of edges in the graph.
* Compute the number of edges in the graph.
*/
lazy val numEdges: Long = graph.edges.count()
/**
* Compute the number of vertices in the graph.
* Compute the number of vertices in the graph.
*/
lazy val numVertices: Long = graph.vertices.count()
......@@ -39,7 +39,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* Compute the out-degree of each vertex in the Graph returning an RDD.
* Compute the out-degree of each vertex in the Graph returning an RDD.
* @note Vertices with no out edges are not returned in the resulting RDD.
*/
lazy val outDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Out)
......@@ -60,7 +60,13 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* neighboring vertex attributes.
*/
private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = {
graph.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection)
if (edgeDirection == EdgeDirection.In) {
graph.mapReduceTriplets(et => Array((et.dstId,1)), _+_)
} else if (edgeDirection == EdgeDirection.Out) {
graph.mapReduceTriplets(et => Array((et.srcId,1)), _+_)
} else { // EdgeDirection.both
graph.mapReduceTriplets(et => Array((et.srcId,1), (et.dstId,1)), _+_)
}
}
......@@ -89,7 +95,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
*
* @example We can use this function to compute the average follower
* age for each user
*
*
* {{{
* val graph: Graph[Int,Int] = loadGraph()
* val averageFollowerAge: RDD[(Int, Int)] =
......@@ -113,15 +119,15 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)
// Define a new map function over edge triplets
// Define a new map function over edge triplets
val mf = (et: EdgeTriplet[VD,ED]) => {
// Compute the message to the dst vertex
val dst =
val dst =
if (dir == EdgeDirection.In || dir == EdgeDirection.Both) {
mapFunc(et.dstId, et)
} else { Option.empty[A] }
// Compute the message to the source vertex
val src =
val src =
if (dir == EdgeDirection.Out || dir == EdgeDirection.Both) {
mapFunc(et.srcId, et)
} else { Option.empty[A] }
......@@ -130,7 +136,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
case (None, None) => Array.empty[(Vid, A)]
case (Some(srcA),None) => Array((et.srcId, srcA))
case (None, Some(dstA)) => Array((et.dstId, dstA))
case (Some(srcA), Some(dstA)) =>
case (Some(srcA), Some(dstA)) =>
Array((et.srcId, srcA), (et.dstId, dstA))
}
}
......@@ -141,24 +147,20 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
/**
* Return the Ids of the neighboring vertices.
* Return the Ids of the neighboring vertices.
*
* @param edgeDirection the direction along which to collect
* neighboring vertices
*
* @return the vertex set of neighboring ids for each vertex.
*/
def collectNeighborIds(edgeDirection: EdgeDirection) :
def collectNeighborIds(edgeDirection: EdgeDirection) :
VertexSetRDD[Array[Vid]] = {
val nbrs = graph.aggregateNeighbors[Array[Vid]](
(vid, edge) => Some(Array(edge.otherVertexId(vid))),
(a, b) => a ++ b,
edgeDirection)
graph.vertices.leftZipJoin(nbrs).mapValues{
case (_, Some(nbrs)) => nbrs
case (_, None) => Array.empty[Vid]
}
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[Vid]) }
} // end of collectNeighborIds
......@@ -175,18 +177,15 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* @return the vertex set of neighboring vertex attributes for each
* vertex.
*/
def collectNeighbors(edgeDirection: EdgeDirection) :
def collectNeighbors(edgeDirection: EdgeDirection) :
VertexSetRDD[ Array[(Vid, VD)] ] = {
val nbrs = graph.aggregateNeighbors[Array[(Vid,VD)]](
(vid, edge) =>
(vid, edge) =>
Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )),
(a, b) => a ++ b,
edgeDirection)
graph.vertices.leftZipJoin(nbrs).mapValues{
case (_, Some(nbrs)) => nbrs
case (_, None) => Array.empty[(Vid, VD)]
}
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[(Vid, VD)]) }
} // end of collectNeighbor
......
......@@ -35,8 +35,8 @@ import org.apache.spark.Logging
* def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] =
* Some(edge.srcAttr * edge.attr)
* def messageCombiner(a: Double, b: Double): Double = a + b
* val initialMessage = 0.0
* // Execute pregel for a fixed number of iterations.
* val initialMessage = 0.0
* // Execute pregel for a fixed number of iterations.
* Pregel(pagerankGraph, initialMessage, numIter)(
* vertexProgram, sendMessage, messageCombiner)
* }}}
......@@ -65,7 +65,7 @@ object Pregel extends Logging {
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param graph the input graph.
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
......@@ -94,80 +94,19 @@ object Pregel extends Logging {
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A, numIter: Int)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
sendMsg: EdgeTriplet[VD, ED] => Array[(Vid,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
apply(graph, initialMsg, numIter, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
} // end of Apply
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates a fixed number (`numIter`) of iterations.
*
* @tparam VD the vertex data type
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
*
* @param numIter the number of iterations to run this computation.
*
* @param sendDir the edge direction along which the `sendMsg`
* function is invoked.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to each
* edge in the direction `sendDir` adjacent to vertices that
* received messages in the current iteration.
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
*/
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A, numIter: Int, sendDir: EdgeDirection)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
def mapF(vid: Vid, edge: EdgeTriplet[VD,ED]) = sendMsg(edge.otherVertexId(vid), edge)
// Receive the first set of messages
var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg))
var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg)).cache
var i = 0
while (i < numIter) {
// compute the messages
val messages = g.aggregateNeighbors(mapF, mergeMsg, sendDir.reverse)
val messages = g.mapReduceTriplets(sendMsg, mergeMsg)
// receive the messages
g = g.joinVertices(messages)(vprog)
g = g.joinVertices(messages)(vprog).cache
// count the iteration
i += 1
}
......@@ -196,7 +135,7 @@ object Pregel extends Logging {
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param graph the input graph.
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
......@@ -225,66 +164,7 @@ object Pregel extends Logging {
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
apply(graph, initialMsg, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
} // end of apply
/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
* user-defined vertex-program `vprog` is executed in parallel on
* each vertex receiving any inbound messages and computing a new
* value for the vertex. The `sendMsg` function is then invoked on
* all out-edges and is used to compute an optional message to the
* destination vertex. The `mergeMsg` function is a commutative
* associative function used to combine messages destined to the
* same vertex.
*
* On the first iteration all vertices receive the `initialMsg` and
* on subsequent iterations if a vertex does not receive a message
* then the vertex-program is not invoked.
*
* This function iterates until there are no remaining messages.
*
* @tparam VD the vertex data type
* @tparam ED the edge data type
* @tparam A the Pregel message type
*
* @param graph the input graph.
*
* @param initialMsg the message each vertex will receive at the on
* the first iteration.
*
* @param numIter the number of iterations to run this computation.
*
* @param sendDir the edge direction along which the `sendMsg`
* function is invoked.
*
* @param vprog the user-defined vertex program which runs on each
* vertex and receives the inbound message and computes a new vertex
* value. On the first iteration the vertex program is invoked on
* all vertices and is passed the default message. On subsequent
* iterations the vertex program is only invoked on those vertices
* that receive messages.
*
* @param sendMsg a user supplied function that is applied to each
* edge in the direction `sendDir` adjacent to vertices that
* received messages in the current iteration.
*
* @param mergeMsg a user supplied function that takes two incoming
* messages of type A and merges them into a single message of type
* A. ''This function must be commutative and associative and
* ideally the size of A should not increase.''
*
* @return the resulting graph at the end of the computation
*
*/
def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
(graph: Graph[VD, ED], initialMsg: A, sendDir: EdgeDirection)(
vprog: (Vid, VD, A) => VD,
sendMsg: (Vid, EdgeTriplet[VD, ED]) => Option[A],
sendMsg: EdgeTriplet[VD, ED] => Array[(Vid,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
......@@ -295,7 +175,7 @@ object Pregel extends Logging {
}
}
def sendMsgFun(vid: Vid, edge: EdgeTriplet[(VD,Boolean), ED]): Option[A] = {
def sendMsgFun(edge: EdgeTriplet[(VD,Boolean), ED]): Array[(Vid, A)] = {
if(edge.srcAttr._2) {
val et = new EdgeTriplet[VD, ED]
et.srcId = edge.srcId
......@@ -303,22 +183,22 @@ object Pregel extends Logging {
et.dstId = edge.dstId
et.dstAttr = edge.dstAttr._1
et.attr = edge.attr
sendMsg(edge.otherVertexId(vid), et)
} else { None }
sendMsg(et)
} else { Array.empty[(Vid,A)] }
}
var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) )
var g = graph.mapVertices( (vid, vdata) => (vprog(vid, vdata, initialMsg), true) )
// compute the messages
var messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, sendDir.reverse).cache
var messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache
var activeMessages = messages.count
// Loop
// Loop
var i = 0
while (activeMessages > 0) {
// receive the messages
g = g.outerJoinVertices(messages)(vprogFun)
val oldMessages = messages
// compute the messages
messages = g.aggregateNeighbors(sendMsgFun, mergeMsg, sendDir.reverse).cache
messages = g.mapReduceTriplets(sendMsgFun, mergeMsg).cache
activeMessages = messages.count
// after counting we can unpersist the old messages
oldMessages.unpersist(blocking=false)
......
package org.apache.spark.graph.impl
import scala.collection.mutable.ArrayBuilder
import org.apache.spark.graph._
......@@ -8,47 +7,46 @@ import org.apache.spark.graph._
* A partition of edges in 3 large columnar arrays.
*/
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest](
val srcIds: Array[Vid],
val dstIds: Array[Vid],
val data: Array[ED]
){
// private var _data: Array[ED] = _
// private var _dataBuilder = ArrayBuilder.make[ED]
// var srcIds = new VertexArrayList
// var dstIds = new VertexArrayList
val srcIds: Array[Vid],
val dstIds: Array[Vid],
val data: Array[ED])
{
def reverse: EdgePartition[ED] = new EdgePartition(dstIds, srcIds, data)
def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = {
val newData = new Array[ED2](data.size)
val edge = new Edge[ED]()
for(i <- 0 until data.size){
val size = data.size
var i = 0
while (i < size) {
edge.srcId = srcIds(i)
edge.dstId = dstIds(i)
edge.attr = data(i)
newData(i) = f(edge)
newData(i) = f(edge)
i += 1
}
new EdgePartition(srcIds, dstIds, newData)
}
def foreach(f: Edge[ED] => Unit) {
val edge = new Edge[ED]
for(i <- 0 until data.size){
edge.srcId = srcIds(i)
edge.dstId = dstIds(i)
val size = data.size
var i = 0
while (i < size) {
edge.srcId = srcIds(i)
edge.dstId = dstIds(i)
edge.attr = data(i)
f(edge)
f(edge)
i += 1
}
}
def size: Int = srcIds.size
def iterator = new Iterator[Edge[ED]] {
private val edge = new Edge[ED]
private var pos = 0
private[this] val edge = new Edge[ED]
private[this] var pos = 0
override def hasNext: Boolean = pos < EdgePartition.this.size
......@@ -61,5 +59,3 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
}
}
}
package org.apache.spark.graph.impl
import org.apache.spark.Partitioner
import org.apache.spark.graph.Pid
import org.apache.spark.graph.{Pid, Vid}
import org.apache.spark.rdd.{ShuffledRDD, RDD}
class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
@transient var partition: Pid,
var vid: Vid,
var data: T)
extends Product2[Pid, (Vid, T)] {
override def _1 = partition
override def _2 = (vid, data)
override def canEqual(that: Any): Boolean = that.isInstanceOf[VertexBroadcastMsg[_]]
}
class AggregationMsg[@specialized(Int, Long, Double, Boolean) T](var vid: Vid, var data: T)
extends Product2[Vid, T] {
override def _1 = vid
override def _2 = data
override def canEqual(that: Any): Boolean = that.isInstanceOf[AggregationMsg[_]]
}
/**
* A message used to send a specific value to a partition.
* @param partition index of the target partition.
......@@ -22,15 +47,42 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef
override def canEqual(that: Any): Boolean = that.isInstanceOf[MessageToPartition[_]]
}
/**
* Companion object for MessageToPartition.
*/
object MessageToPartition {
def apply[T](partition: Pid, value: T) = new MessageToPartition(partition, value)
class VertexBroadcastMsgRDDFunctions[T: ClassManifest](self: RDD[VertexBroadcastMsg[T]]) {
def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
val rdd = new ShuffledRDD[Pid, (Vid, T), VertexBroadcastMsg[T]](self, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classManifest[T] == ClassManifest.Int) {
rdd.setSerializer(classOf[IntVertexBroadcastMsgSerializer].getName)
} else if (classManifest[T] == ClassManifest.Long) {
rdd.setSerializer(classOf[LongVertexBroadcastMsgSerializer].getName)
} else if (classManifest[T] == ClassManifest.Double) {
rdd.setSerializer(classOf[DoubleVertexBroadcastMsgSerializer].getName)
}
rdd
}
}
class MessageToPartitionRDDFunctions[T: ClassManifest](self: RDD[MessageToPartition[T]]) {
class AggregationMessageRDDFunctions[T: ClassManifest](self: RDD[AggregationMsg[T]]) {
def partitionBy(partitioner: Partitioner): RDD[AggregationMsg[T]] = {
val rdd = new ShuffledRDD[Vid, T, AggregationMsg[T]](self, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classManifest[T] == ClassManifest.Int) {
rdd.setSerializer(classOf[IntAggMsgSerializer].getName)
} else if (classManifest[T] == ClassManifest.Long) {
rdd.setSerializer(classOf[LongAggMsgSerializer].getName)
} else if (classManifest[T] == ClassManifest.Double) {
rdd.setSerializer(classOf[DoubleAggMsgSerializer].getName)
}
rdd
}
}
class MsgRDDFunctions[T: ClassManifest](self: RDD[MessageToPartition[T]]) {
/**
* Return a copy of the RDD partitioned using the specified partitioner.
......@@ -42,8 +94,16 @@ class MessageToPartitionRDDFunctions[T: ClassManifest](self: RDD[MessageToPartit
}
object MessageToPartitionRDDFunctions {
object MsgRDDFunctions {
implicit def rdd2PartitionRDDFunctions[T: ClassManifest](rdd: RDD[MessageToPartition[T]]) = {
new MessageToPartitionRDDFunctions(rdd)
new MsgRDDFunctions(rdd)
}
implicit def rdd2vertexMessageRDDFunctions[T: ClassManifest](rdd: RDD[VertexBroadcastMsg[T]]) = {
new VertexBroadcastMsgRDDFunctions(rdd)
}
implicit def rdd2aggMessageRDDFunctions[T: ClassManifest](rdd: RDD[AggregationMsg[T]]) = {
new AggregationMessageRDDFunctions(rdd)
}
}
package org.apache.spark.graph.impl
import java.io.{EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer
import org.apache.spark.serializer._
/** A special shuffle serializer for VertexBroadcastMessage[Int]. */
class IntVertexBroadcastMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Int]]
writeLong(msg.vid)
writeInt(msg.data)
this
}
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
new VertexBroadcastMsg[Int](0, readLong(), readInt()).asInstanceOf[T]
}
}
}
}
/** A special shuffle serializer for VertexBroadcastMessage[Long]. */
class LongVertexBroadcastMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Long]]
writeLong(msg.vid)
writeLong(msg.data)
this
}
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
val a = readLong()
val b = readLong()
new VertexBroadcastMsg[Long](0, a, b).asInstanceOf[T]
}
}
}
}
/** A special shuffle serializer for VertexBroadcastMessage[Double]. */
class DoubleVertexBroadcastMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[VertexBroadcastMsg[Double]]
writeLong(msg.vid)
writeDouble(msg.data)
this
}
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
def readObject[T](): T = {
val a = readLong()
val b = readDouble()
new VertexBroadcastMsg[Double](0, a, b).asInstanceOf[T]
}
}
}
}
/** A special shuffle serializer for AggregationMessage[Int]. */
class IntAggMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[AggregationMsg[Int]]
writeLong(msg.vid)
writeInt(msg.data)
this
}
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
val a = readLong()
val b = readInt()
new AggregationMsg[Int](a, b).asInstanceOf[T]
}
}
}
}
/** A special shuffle serializer for AggregationMessage[Long]. */
class LongAggMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[AggregationMsg[Long]]
writeLong(msg.vid)
writeLong(msg.data)
this
}
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
val a = readLong()
val b = readLong()
new AggregationMsg[Long](a, b).asInstanceOf[T]
}
}
}
}
/** A special shuffle serializer for AggregationMessage[Double]. */
class DoubleAggMsgSerializer extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[AggregationMsg[Double]]
writeLong(msg.vid)
writeDouble(msg.data)
this
}
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
def readObject[T](): T = {
val a = readLong()
val b = readDouble()
new AggregationMsg[Double](a, b).asInstanceOf[T]
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Helper classes to shorten the implementation of those special serializers.
////////////////////////////////////////////////////////////////////////////////
sealed abstract class ShuffleSerializationStream(s: OutputStream) extends SerializationStream {
// The implementation should override this one.
def writeObject[T](t: T): SerializationStream
def writeInt(v: Int) {
s.write(v >> 24)
s.write(v >> 16)
s.write(v >> 8)
s.write(v)
}
def writeLong(v: Long) {
s.write((v >>> 56).toInt)
s.write((v >>> 48).toInt)
s.write((v >>> 40).toInt)
s.write((v >>> 32).toInt)
s.write((v >>> 24).toInt)
s.write((v >>> 16).toInt)
s.write((v >>> 8).toInt)
s.write(v.toInt)
}
def writeDouble(v: Double) {
writeLong(java.lang.Double.doubleToLongBits(v))
}
override def flush(): Unit = s.flush()
override def close(): Unit = s.close()
}
sealed abstract class ShuffleDeserializationStream(s: InputStream) extends DeserializationStream {
// The implementation should override this one.
def readObject[T](): T
def readInt(): Int = {
val first = s.read()
if (first < 0) throw new EOFException
(first & 0xFF) << 24 | (s.read() & 0xFF) << 16 | (s.read() & 0xFF) << 8 | (s.read() & 0xFF)
}
def readLong(): Long = {
val first = s.read()
if (first < 0) throw new EOFException()
(first.toLong << 56) |
(s.read() & 0xFF).toLong << 48 |
(s.read() & 0xFF).toLong << 40 |
(s.read() & 0xFF).toLong << 32 |
(s.read() & 0xFF).toLong << 24 |
(s.read() & 0xFF) << 16 |
(s.read() & 0xFF) << 8 |
(s.read() & 0xFF)
}
def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong())
override def close(): Unit = s.close()
}
sealed trait ShuffleSerializerInstance extends SerializerInstance {
override def serialize[T](t: T): ByteBuffer = throw new UnsupportedOperationException
override def deserialize[T](bytes: ByteBuffer): T = throw new UnsupportedOperationException
override def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T =
throw new UnsupportedOperationException
// The implementation should override the following two.
override def serializeStream(s: OutputStream): SerializationStream
override def deserializeStream(s: InputStream): DeserializationStream
}
......@@ -8,10 +8,9 @@ package object graph {
type Vid = Long
type Pid = Int
type VertexHashMap[T] = it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap[T]
type VertexSet = it.unimi.dsi.fastutil.longs.LongOpenHashSet
type VertexSet = OpenHashSet[Vid]
type VertexArrayList = it.unimi.dsi.fastutil.longs.LongArrayList
// type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap
type VertexIdToIndexMap = OpenHashSet[Vid]
......
......@@ -4,6 +4,7 @@ import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.graph.LocalSparkContext._
......@@ -58,8 +59,9 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val prGraph1 = Analytics.pagerank(starGraph, 1, resetProb)
val prGraph2 = Analytics.pagerank(starGraph, 2, resetProb)
val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices)
.map{ case (vid, (pr1, pr2)) => if (pr1 != pr2) { 1 } else { 0 } }.sum
val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices) { (vid, pr1, pr2) =>
if (pr1 != pr2) { 1 } else { 0 }
}.map { case (vid, test) => test }.sum
assert(notMatching === 0)
prGraph2.vertices.foreach(println(_))
val errors = prGraph2.vertices.map{ case (vid, pr) =>
......@@ -70,10 +72,12 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
assert(errors.sum === 0)
val prGraph3 = Analytics.deltaPagerank(starGraph, 0, resetProb)
val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices).map{
case (_, (pr1, Some(pr2))) if(pr1 == pr2) => 0
case _ => 1
}.sum
val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices){ (vid, pr1, pr2Opt) =>
pr2Opt match {
case Some(pr2) if(pr1 == pr2) => 0
case _ => 1
}
}.map { case (vid, test) => test }.sum
assert(errors2 === 0)
}
} // end of test Star PageRank
......@@ -86,19 +90,17 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
val resetProb = 0.15
val prGraph1 = Analytics.pagerank(gridGraph, 50, resetProb).cache()
val prGraph2 = Analytics.deltaPagerank(gridGraph, 0.0001, resetProb).cache()
val error = prGraph1.vertices.zipJoin(prGraph2.vertices).map {
case (id, (a, b)) => (a - b) * (a - b)
}.sum
prGraph1.vertices.zipJoin(prGraph2.vertices)
.map{ case (id, (a,b)) => (id, (a,b, a-b))}.foreach(println(_))
val error = prGraph1.vertices.zipJoin(prGraph2.vertices) { case (id, a, b) => (a - b) * (a - b) }
.map { case (id, error) => error }.sum
prGraph1.vertices.zipJoin(prGraph2.vertices) { (id, a, b) => (a, b, a-b) }.foreach(println(_))
println(error)
assert(error < 1.0e-5)
val pr3 = sc.parallelize(GridPageRank(10,10, 50, resetProb))
val error2 = prGraph1.vertices.leftJoin(pr3).map {
case (id, (a, Some(b))) => (a - b) * (a - b)
case _ => 0
}.sum
prGraph1.vertices.leftJoin(pr3).foreach(println( _ ))
val pr3: RDD[(Vid, Double)] = sc.parallelize(GridPageRank(10,10, 50, resetProb))
val error2 = prGraph1.vertices.leftJoin(pr3) { (id, a, bOpt) =>
val b: Double = bOpt.get
(a - b) * (a - b)
}.map { case (id, error) => error }.sum
prGraph1.vertices.leftJoin(pr3) { (id, a, b) => (a, b) }.foreach( println(_) )
println(error2)
assert(error2 < 1.0e-5)
}
......
......@@ -78,13 +78,13 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val a = sc.parallelize((0 to 100).map(x => (x.toLong, x.toLong)), 5)
val b = VertexSetRDD(a).mapValues(x => -x)
assert(b.count === 101)
assert(b.leftJoin(a).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
assert(b.leftJoin(a){ (id, a, bOpt) => a + bOpt.get }.map(x=> x._2).reduce(_+_) === 0)
val c = VertexSetRDD(a, b.index)
assert(b.leftJoin(c).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
assert(b.leftJoin(c){ (id, b, cOpt) => b + cOpt.get }.map(x=> x._2).reduce(_+_) === 0)
val d = c.filter(q => ((q._2 % 2) == 0))
val e = a.filter(q => ((q._2 % 2) == 0))
assert(d.count === e.count)
assert(b.zipJoin(c).mapValues(x => x._1 + x._2).map(x => x._2).reduce(_+_) === 0)
assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0)
}
}
......
package org.apache.spark.graph
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.graph.LocalSparkContext._
import java.io.{EOFException, ByteArrayInputStream, ByteArrayOutputStream}
import org.apache.spark.graph.impl._
import org.apache.spark.graph.impl.MsgRDDFunctions._
import org.apache.spark._
class SerializerSuite extends FunSuite with LocalSparkContext {
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
test("TestVertexBroadcastMessageInt") {
val outMsg = new VertexBroadcastMsg[Int](3,4,5)
val bout = new ByteArrayOutputStream
val outStrm = new IntVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new IntVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Int] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
assert(outMsg.vid === inMsg2.vid)
assert(outMsg.data === inMsg1.data)
assert(outMsg.data === inMsg2.data)
intercept[EOFException] {
inStrm.readObject()
}
}
test("TestVertexBroadcastMessageLong") {
val outMsg = new VertexBroadcastMsg[Long](3,4,5)
val bout = new ByteArrayOutputStream
val outStrm = new LongVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new LongVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Long] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
assert(outMsg.vid === inMsg2.vid)
assert(outMsg.data === inMsg1.data)
assert(outMsg.data === inMsg2.data)
intercept[EOFException] {
inStrm.readObject()
}
}
test("TestVertexBroadcastMessageDouble") {
val outMsg = new VertexBroadcastMsg[Double](3,4,5.0)
val bout = new ByteArrayOutputStream
val outStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject()
val inMsg2: VertexBroadcastMsg[Double] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
assert(outMsg.vid === inMsg2.vid)
assert(outMsg.data === inMsg1.data)
assert(outMsg.data === inMsg2.data)
intercept[EOFException] {
inStrm.readObject()
}
}
test("TestAggregationMessageInt") {
val outMsg = new AggregationMsg[Int](4,5)
val bout = new ByteArrayOutputStream
val outStrm = new IntAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new IntAggMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: AggregationMsg[Int] = inStrm.readObject()
val inMsg2: AggregationMsg[Int] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
assert(outMsg.vid === inMsg2.vid)
assert(outMsg.data === inMsg1.data)
assert(outMsg.data === inMsg2.data)
intercept[EOFException] {
inStrm.readObject()
}
}
test("TestAggregationMessageLong") {
val outMsg = new AggregationMsg[Long](4,5)
val bout = new ByteArrayOutputStream
val outStrm = new LongAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new LongAggMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: AggregationMsg[Long] = inStrm.readObject()
val inMsg2: AggregationMsg[Long] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
assert(outMsg.vid === inMsg2.vid)
assert(outMsg.data === inMsg1.data)
assert(outMsg.data === inMsg2.data)
intercept[EOFException] {
inStrm.readObject()
}
}
test("TestAggregationMessageDouble") {
val outMsg = new AggregationMsg[Double](4,5.0)
val bout = new ByteArrayOutputStream
val outStrm = new DoubleAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new DoubleAggMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: AggregationMsg[Double] = inStrm.readObject()
val inMsg2: AggregationMsg[Double] = inStrm.readObject()
assert(outMsg.vid === inMsg1.vid)
assert(outMsg.vid === inMsg2.vid)
assert(outMsg.data === inMsg1.data)
assert(outMsg.data === inMsg2.data)
intercept[EOFException] {
inStrm.readObject()
}
}
test("TestShuffleVertexBroadcastMsg") {
withSpark(new SparkContext("local[2]", "test")) { sc =>
val bmsgs = sc.parallelize(0 until 100, 10).map { pid =>
new VertexBroadcastMsg[Int](pid, pid, pid)
}
bmsgs.partitionBy(new HashPartitioner(3)).collect()
}
}
test("TestShuffleAggregationMsg") {
withSpark(new SparkContext("local[2]", "test")) { sc =>
val bmsgs = sc.parallelize(0 until 100, 10).map(pid => new AggregationMsg[Int](pid, pid))
bmsgs.partitionBy(new HashPartitioner(3)).collect()
}
}
}
\ No newline at end of file
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Shell script for starting the Spark Shell REPL
# Note that it will set MASTER to spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}
# if those two env vars are set in spark-env.sh but MASTER is not.
# Options:
# -c <cores> Set the number of cores for REPL to use
#
# Enter posix mode for bash
set -o posix
# Update the the banner logo
export SPARK_BANNER_TEXT="Welcome to
______ __ _ __
/ ____/________ _____ / /_ | |/ /
/ / __/ ___/ __ \`/ __ \/ __ \| /
/ /_/ / / / /_/ / /_/ / / / / |
\____/_/ \__,_/ .___/_/ /_/_/|_|
/_/ Alpha Release
Powered by:
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ \`/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\
/_/ version 0.9.0
Example:
scala> val graph = GraphLoader.textFile(sc, \"hdfs://links\")
scala> graph.numVertices
scala> graph.numEdges
scala> val pageRankGraph = Analytics.pagerank(graph, 10) // 10 iterations
scala> val maxPr = pageRankGraph.vertices.map{ case (vid, pr) => pr }.max
scala> println(maxPr)
"
export SPARK_SHELL_INIT_BLOCK="import org.apache.spark.graph._;"
# Set the serializer to use Kryo for graphx objects
SPARK_JAVA_OPTS+=" -Dspark.serializer=org.apache.spark.serializer.KryoSerializer "
SPARK_JAVA_OPTS+="-Dspark.kryo.registrator=org.apache.spark.graph.GraphKryoRegistrator "
SPARK_JAVA_OPTS+="-Dspark.kryoserializer.buffer.mb=10 "
FWDIR="`dirname $0`"
for o in "$@"; do
if [ "$1" = "-c" -o "$1" = "--cores" ]; then
shift
if [ -n "$1" ]; then
OPTIONS="-Dspark.cores.max=$1"
shift
fi
fi
done
# Set MASTER from spark-env if possible
if [ -z "$MASTER" ]; then
if [ -e "$FWDIR/conf/spark-env.sh" ]; then
. "$FWDIR/conf/spark-env.sh"
fi
if [[ "x" != "x$SPARK_MASTER_IP" && "y" != "y$SPARK_MASTER_PORT" ]]; then
MASTER="spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}"
export MASTER
fi
fi
# Copy restore-TTY-on-exit functions from Scala script so spark-shell exits properly even in
# binary distribution of Spark where Scala is not installed
exit_status=127
saved_stty=""
# restore stty settings (echo in particular)
function restoreSttySettings() {
stty $saved_stty
saved_stty=""
}
function onExit() {
if [[ "$saved_stty" != "" ]]; then
restoreSttySettings
fi
exit $exit_status
}
# to reenable echo if we are interrupted before completing.
trap onExit INT
# save terminal settings
saved_stty=$(stty -g 2>/dev/null)
# clear on error so we don't later try to restore them
if [[ ! $? ]]; then
saved_stty=""
fi
$FWDIR/spark-class $OPTIONS org.apache.spark.repl.Main "$@"
# record the exit status lest it be overwritten:
# then reenable echo and propagate the code.
exit_status=$?
onExit
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment