Skip to content
Snippets Groups Projects
Commit f40a0898 authored by Ankur Dave's avatar Ankur Dave
Browse files

Rename bagel to spark.bagel and Pregel to Bagel

parent c1104058
No related branches found
No related tags found
No related merge requests found
package bagel
package spark.bagel
import spark._
import spark.SparkContext._
import scala.collection.mutable.ArrayBuffer
object Pregel extends Logging {
object Bagel extends Logging {
def run[V <: Vertex : Manifest, M <: Message : Manifest, C : Manifest, A : Manifest](
sc: SparkContext,
verts: RDD[(String, V)],
......@@ -88,7 +88,7 @@ object Pregel extends Logging {
/**
* Converts a compute function that doesn't take an aggregator to
* one that does, so it can be passed to Pregel.run.
* one that does, so it can be passed to Bagel.run.
*/
implicit def addAggregatorArg[
V <: Vertex : Manifest, M <: Message : Manifest, C
......@@ -128,7 +128,7 @@ class NullAggregator[V] extends Aggregator[V, Option[Nothing]] {
}
/**
* Represents a Pregel vertex.
* Represents a Bagel vertex.
*
* Subclasses may store state along with each vertex and must be
* annotated with @serializable.
......@@ -139,7 +139,7 @@ trait Vertex {
}
/**
* Represents a Pregel message to a target vertex.
* Represents a Bagel message to a target vertex.
*
* Subclasses may contain a payload to deliver to the target vertex
* and must be annotated with @serializable.
......
package bagel.examples
package spark.bagel.examples
import spark._
import spark.SparkContext._
import scala.math.min
import bagel._
import bagel.Pregel._
import spark.bagel._
import spark.bagel.Bagel._
object ShortestPath {
def main(args: Array[String]) {
......@@ -68,7 +68,7 @@ object ShortestPath {
(new SPVertex(self.id, newValue, self.outEdges, false), outbox)
}
val result = Pregel.run(sc, vertices, messages)(combiner = MinCombiner, numSplits = numSplits)(compute)
val result = Bagel.run(sc, vertices, messages)(combiner = MinCombiner, numSplits = numSplits)(compute)
// Print the result
System.err.println("Shortest path from "+startVertex+" to all vertices:")
......
package bagel.examples
package spark.bagel.examples
import spark._
import spark.SparkContext._
import bagel._
import bagel.Pregel._
import spark.bagel._
import spark.bagel.Bagel._
import scala.collection.mutable.ArrayBuffer
import scala.xml.{XML,NodeSeq}
......@@ -63,9 +63,9 @@ object WikipediaPageRank {
val messages = sc.parallelize(List[(String, PRMessage)]())
val result =
if (noCombiner) {
Pregel.run(sc, vertices, messages)(numSplits = numSplits)(PRNoCombiner.compute(numVertices, epsilon))
Bagel.run(sc, vertices, messages)(numSplits = numSplits)(PRNoCombiner.compute(numVertices, epsilon))
} else {
Pregel.run(sc, vertices, messages)(combiner = PRCombiner, numSplits = numSplits)(PRCombiner.compute(numVertices, epsilon))
Bagel.run(sc, vertices, messages)(combiner = PRCombiner, numSplits = numSplits)(PRCombiner.compute(numVertices, epsilon))
}
// Print the result
......
package bagel
package spark.bagel
import org.scalatest.{FunSuite, Assertions}
import org.scalatest.prop.Checkers
......@@ -10,7 +10,7 @@ import scala.collection.mutable.ArrayBuffer
import spark._
import bagel.Pregel._
import spark.bagel.Bagel._
@serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex
@serializable class TestMessage(val targetId: String) extends Message
......@@ -22,7 +22,7 @@ class BagelSuite extends FunSuite with Assertions {
val msgs = sc.parallelize(Array[(String, TestMessage)]())
val numSupersteps = 5
val result =
Pregel.run(sc, verts, msgs)()(addAggregatorArg {
Bagel.run(sc, verts, msgs)()(addAggregatorArg {
(self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) =>
(new TestVertex(self.id, superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
})
......@@ -36,7 +36,7 @@ class BagelSuite extends FunSuite with Assertions {
val msgs = sc.parallelize(Array("a" -> new TestMessage("a")))
val numSupersteps = 5
val result =
Pregel.run(sc, verts, msgs)()(addAggregatorArg {
Bagel.run(sc, verts, msgs)()(addAggregatorArg {
(self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) =>
val msgsOut =
msgs match {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment