Skip to content
Snippets Groups Projects
Commit d4d9ece1 authored by Ankur Dave's avatar Ankur Dave
Browse files

Remove Graph.statistics and GraphImpl.printLineage

parent ee8931d2
No related branches found
No related tags found
No related merge requests found
...@@ -90,11 +90,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { ...@@ -90,11 +90,6 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
*/ */
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
/**
* Computes statistics describing the graph representation.
*/
def statistics: Map[String, Any]
/** /**
* Transforms each vertex attribute in the graph using the map function. * Transforms each vertex attribute in the graph using the map function.
* *
...@@ -254,7 +249,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { ...@@ -254,7 +249,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
/** /**
* Computes statistics about the neighboring edges and vertices of each vertex. The user supplied * Aggregates values from the neighboring edges and vertices of each vertex. The user supplied
* `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be * `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be
* "sent" to either vertex in the edge. The `reduceFunc` is then used to combine the output of * "sent" to either vertex in the edge. The `reduceFunc` is then used to combine the output of
* the map phase destined to each vertex. * the map phase destined to each vertex.
......
...@@ -83,71 +83,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( ...@@ -83,71 +83,6 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
GraphImpl(vertices, newEdges) GraphImpl(vertices, newEdges)
} }
override def statistics: Map[String, Any] = {
// Get the total number of vertices after replication, used to compute the replication ratio.
def numReplicatedVertices(vid2pids: RDD[Array[Array[VertexID]]]): Double = {
vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble
}
val numVertices = this.ops.numVertices
val numEdges = this.ops.numEdges
val replicationRatioBoth = numReplicatedVertices(routingTable.bothAttrs) / numVertices
val replicationRatioSrcOnly = numReplicatedVertices(routingTable.srcAttrOnly) / numVertices
val replicationRatioDstOnly = numReplicatedVertices(routingTable.dstAttrOnly) / numVertices
// One entry for each partition, indicate the total number of edges on that partition.
val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges)
val minLoad = loadArray.min
val maxLoad = loadArray.max
Map(
"Num Vertices" -> numVertices,
"Num Edges" -> numEdges,
"Replication (both)" -> replicationRatioBoth,
"Replication (src only)" -> replicationRatioSrcOnly,
"Replication (dest only)" -> replicationRatioDstOnly,
"Load Array" -> loadArray,
"Min Load" -> minLoad,
"Max Load" -> maxLoad)
}
/**
* Display the lineage information for this graph.
*/
def printLineage() = {
def traverseLineage(
rdd: RDD[_],
indent: String = "",
visited: Map[Int, String] = Map.empty[Int, String]) {
if (visited.contains(rdd.id)) {
println(indent + visited(rdd.id))
println(indent)
} else {
val locs = rdd.partitions.map( p => rdd.preferredLocations(p) )
val cacheLevel = rdd.getStorageLevel
val name = rdd.id
val deps = rdd.dependencies
val partitioner = rdd.partitioner
val numparts = partitioner match { case Some(p) => p.numPartitions; case None => 0}
println(indent + name + ": " + cacheLevel.description + " (partitioner: " + partitioner +
", " + numparts +")")
println(indent + " |---> Deps: " + deps.map(d => (d, d.rdd.id) ).toString)
println(indent + " |---> PrefLoc: " + locs.map(x=> x.toString).mkString(", "))
deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited))
}
}
println("edges ------------------------------------------")
traverseLineage(edges, " ")
var visited = Map(edges.id -> "edges")
println("\n\nvertices ------------------------------------------")
traverseLineage(vertices, " ", visited)
visited += (vertices.id -> "vertices")
println("\n\nroutingTable.bothAttrs -------------------------------")
traverseLineage(routingTable.bothAttrs, " ", visited)
visited += (routingTable.bothAttrs.id -> "routingTable.bothAttrs")
println("\n\ntriplets ----------------------------------------")
traverseLineage(triplets, " ", visited)
println(visited)
} // end of printLineage
override def reverse: Graph[VD, ED] = { override def reverse: Graph[VD, ED] = {
val newETable = edges.mapEdgePartitions((pid, part) => part.reverse) val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
......
...@@ -73,9 +73,6 @@ object PageRank extends Logging { ...@@ -73,9 +73,6 @@ object PageRank extends Logging {
.mapVertices( (id, attr) => 1.0 ) .mapVertices( (id, attr) => 1.0 )
.cache() .cache()
// Display statistics about pagerank
logInfo(pagerankGraph.statistics.toString)
// Define the three functions needed to implement PageRank in the GraphX // Define the three functions needed to implement PageRank in the GraphX
// version of Pregel // version of Pregel
def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double = def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
...@@ -121,9 +118,6 @@ object PageRank extends Logging { ...@@ -121,9 +118,6 @@ object PageRank extends Logging {
.mapVertices( (id, attr) => (0.0, 0.0) ) .mapVertices( (id, attr) => (0.0, 0.0) )
.cache() .cache()
// Display statistics about pagerank
logInfo(pagerankGraph.statistics.toString)
// Define the three functions needed to implement PageRank in the GraphX // Define the three functions needed to implement PageRank in the GraphX
// version of Pregel // version of Pregel
def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = { def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment