Skip to content
Snippets Groups Projects
Commit 9535f404 authored by Ankur Dave's avatar Ankur Dave Committed by Reynold Xin
Browse files

Add landmark-based Shortest Path algorithm to graphx.lib

This is a modified version of apache/spark#10.

Author: Ankur Dave <ankurdave@gmail.com>
Author: Andres Perez <andres@tresata.com>

Closes #933 from ankurdave/shortestpaths and squashes the following commits:

03a103c [Ankur Dave] Style fixes
7a1ff48 [Ankur Dave] Improve ShortestPaths documentation
d75c8fc [Ankur Dave] Remove unnecessary VD type param, and pass through ED
d983fb4 [Ankur Dave] Fix style errors
60ed8e6 [Andres Perez] Add Shortest-path computations to graphx.lib with unit tests.
parent d17d2214
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.graphx.lib
import org.apache.spark.graphx._
import scala.reflect.ClassTag
/**
* Computes shortest paths to the given set of landmark vertices, returning a graph where each
* vertex attribute is a map containing the shortest-path distance to each reachable landmark.
*/
object ShortestPaths {
/** Stores a map from the vertex id of a landmark to the distance to that landmark. */
type SPMap = Map[VertexId, Int]
private def makeMap(x: (VertexId, Int)*) = Map(x: _*)
private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap =
(spmap1.keySet ++ spmap2.keySet).map {
k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
}.toMap
/**
* Computes shortest paths to the given set of landmark vertices.
*
* @tparam ED the edge attribute type (not used in the computation)
*
* @param graph the graph for which to compute the shortest paths
* @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each
* landmark.
*
* @return a graph where each vertex attribute is a map containing the shortest-path distance to
* each reachable landmark vertex.
*/
def run[ED: ClassTag](graph: Graph[_, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
val spGraph = graph.mapVertices { (vid, attr) =>
if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap()
}
val initialMessage = makeMap()
def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
addMaps(attr, msg)
}
def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
val newAttr = incrementMap(edge.srcAttr)
if (edge.dstAttr != addMaps(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr))
else Iterator.empty
}
Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.graphx.lib
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._
class ShortestPathsSuite extends FunSuite with LocalSparkContext {
test("Shortest Path Computations") {
withSpark { sc =>
val shortestPaths = Set(
(1, Map(1 -> 0, 4 -> 2)), (2, Map(1 -> 1, 4 -> 2)), (3, Map(1 -> 2, 4 -> 1)),
(4, Map(1 -> 2, 4 -> 0)), (5, Map(1 -> 1, 4 -> 1)), (6, Map(1 -> 3, 4 -> 1)))
val edgeSeq = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)).flatMap {
case e => Seq(e, e.swap)
}
val edges = sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) }
val graph = Graph.fromEdgeTuples(edges, 1)
val landmarks = Seq(1, 4).map(_.toLong)
val results = ShortestPaths.run(graph, landmarks).vertices.collect.map {
case (v, spMap) => (v, spMap.mapValues(_.get))
}
assert(results.toSet === shortestPaths)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment