Skip to content
Snippets Groups Projects
Commit 1c8ca0eb authored by Ankur Dave's avatar Ankur Dave
Browse files

Add Bagel test suite

Note: This test suite currently fails for the same reason that the
Spark Core test suite fails: Spark currently seems to have a bug where
any test after the first one fails.
parent c5b3ea75
No related branches found
No related tags found
No related merge requests found
......@@ -75,6 +75,14 @@ object Pregel extends Logging {
run(sc, newVerts, newMsgs, createCombiner, mergeMsg, mergeCombiners, numSplits, superstep + 1)(compute)
}
}
def defaultCreateCombiner[M <: Message](msg: M): ArrayBuffer[M] = ArrayBuffer(msg)
def defaultMergeMsg[M <: Message](combiner: ArrayBuffer[M], msg: M): ArrayBuffer[M] =
combiner += msg
def defaultMergeCombiners[M <: Message](a: ArrayBuffer[M], b: ArrayBuffer[M]): ArrayBuffer[M] =
a ++= b
def defaultCompute[V <: Vertex, M <: Message](self: V, msgs: Option[ArrayBuffer[M]], superstep: Int): (V, Iterable[M]) =
(self, List())
}
/**
......
package bagel
import org.scalatest.{FunSuite, Assertions}
import org.scalatest.prop.Checkers
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
import org.scalacheck.Prop._
import scala.collection.mutable.ArrayBuffer
import spark._
@serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex
@serializable class TestMessage(val targetId: String) extends Message
class BagelSuite extends FunSuite with Assertions {
test("halting by voting") {
val sc = new SparkContext("local", "test")
val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(id, true, 0))))
val msgs = sc.parallelize(Array[(String, TestMessage)]())
val numSupersteps = 5
val result =
Pregel.run(sc, verts, msgs,
Pregel.defaultCreateCombiner[TestMessage],
Pregel.defaultMergeMsg[TestMessage],
Pregel.defaultMergeCombiners[TestMessage], 1) {
(self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) =>
(new TestVertex(self.id, superstep < numSupersteps - 1, self.age + 1), Array[TestMessage]())
}
for (vert <- result.collect)
assert(vert.age === numSupersteps)
}
test("halting by message silence") {
val sc = new SparkContext("local", "test")
val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(id, false, 0))))
val msgs = sc.parallelize(Array("a" -> new TestMessage("a")))
val numSupersteps = 5
val result =
Pregel.run(sc, verts, msgs,
Pregel.defaultCreateCombiner[TestMessage],
Pregel.defaultMergeMsg[TestMessage],
Pregel.defaultMergeCombiners[TestMessage], 1) {
(self: TestVertex, msgs: Option[ArrayBuffer[TestMessage]], superstep: Int) =>
val msgsOut =
msgs match {
case Some(ms) if (superstep < numSupersteps - 1) =>
ms
case _ =>
new ArrayBuffer[TestMessage]()
}
(new TestVertex(self.id, self.active, self.age + 1), msgsOut)
}
for (vert <- result.collect)
assert(vert.age === numSupersteps)
}
}
......@@ -14,7 +14,7 @@ extends ParentProject(info) with IdeaProject
lazy val examples =
project("examples", "Spark Examples", new ExamplesProject(_), core)
lazy val bagel = project("bagel", "Bagel", core)
lazy val bagel = project("bagel", "Bagel", new BagelProject(_), core)
class CoreProject(info: ProjectInfo)
extends DefaultProject(info) with Eclipsify with IdeaProject with DepJar with XmlTestReport
......@@ -23,6 +23,10 @@ extends ParentProject(info) with IdeaProject
class ExamplesProject(info: ProjectInfo)
extends DefaultProject(info) with Eclipsify with IdeaProject
{}
class BagelProject(info: ProjectInfo)
extends DefaultProject(info) with DepJar with XmlTestReport
{}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment