Skip to content
Snippets Groups Projects
Commit 19122af7 authored by Ankur Dave's avatar Ankur Dave
Browse files

Update ShortestPath to work with controllable partitioning

parent 45ec9db8
No related branches found
No related tags found
No related merge requests found
......@@ -5,7 +5,6 @@ import spark.SparkContext._
import scala.math.min
/*
object ShortestPath {
def main(args: Array[String]) {
if (args.length < 4) {
......@@ -26,7 +25,7 @@ object ShortestPath {
.filter(!_.matches("^\\s*#.*"))
.map(line => line.split("\t")))
val vertices: RDD[(String, Either[SPVertex, SPMessage])] =
val vertices: RDD[(String, SPVertex)] =
(lines.groupBy(line => line(0))
.map {
case (vertexId, lines) => {
......@@ -35,18 +34,16 @@ object ShortestPath {
new SPEdge(targetId, edgeValue.toInt)
}
(vertexId, Left[SPVertex, SPMessage](new SPVertex(vertexId, Int.MaxValue, outEdges, true)))
(vertexId, new SPVertex(vertexId, Int.MaxValue, outEdges, true))
}
})
val messages: RDD[(String, Either[SPVertex, SPMessage])] =
val messages: RDD[(String, SPMessage)] =
(lines.filter(_.length == 2)
.map {
case Array(vertexId, messageValue) =>
(vertexId, Right[SPVertex, SPMessage](new SPMessage(vertexId, messageValue.toInt)))
(vertexId, new SPMessage(vertexId, messageValue.toInt))
})
val graph: RDD[(String, Either[SPVertex, SPMessage])] = vertices ++ messages
System.err.println("Read "+vertices.count()+" vertices and "+
messages.count()+" messages.")
......@@ -55,7 +52,7 @@ object ShortestPath {
def messageCombiner(minSoFar: Int, message: SPMessage): Int =
min(minSoFar, message.value)
val result = Pregel.run(sc, graph, numSplits, messageCombiner, () => Int.MaxValue, min _) {
val result = Pregel.run(sc, vertices, messages, numSplits, messageCombiner, () => Int.MaxValue, min _) {
(self: SPVertex, messageMinValue: Int, superstep: Int) =>
val newValue = min(self.value, messageMinValue)
......@@ -83,4 +80,3 @@ object ShortestPath {
@serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex
@serializable class SPEdge(val targetId: String, val value: Int) extends Edge
@serializable class SPMessage(val targetId: String, val value: Int) extends Message
*/
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment