Skip to content
Snippets Groups Projects
Commit 5a21e3e7 authored by NamelessAnalyst's avatar NamelessAnalyst Committed by Ankur Dave
Browse files

SPARK-3716 [GraphX] Update Analytics.scala for partitionStrategy assignment

Previously, when the val partitionStrategy was created it called a function in the Analytics object which was a copy of the PartitionStrategy.fromString() method. This function has been removed, and the assignment of partitionStrategy now uses the PartitionStrategy.fromString method instead. In this way, it better matches the declarations of edge/vertex StorageLevel variables.

Author: NamelessAnalyst <NamelessAnalyst@users.noreply.github.com>

Closes #2569 from NamelessAnalyst/branch-1.1 and squashes the following commits:

c24ff51 [NamelessAnalyst] Update Analytics.scala
parent 18ef22ab
No related branches found
No related tags found
No related merge requests found
......@@ -45,17 +45,6 @@ object Analytics extends Logging {
}
val options = mutable.Map(optionsList: _*)
def pickPartitioner(v: String): PartitionStrategy = {
// TODO: Use reflection rather than listing all the partitioning strategies here.
v match {
case "RandomVertexCut" => RandomVertexCut
case "EdgePartition1D" => EdgePartition1D
case "EdgePartition2D" => EdgePartition2D
case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + v)
}
}
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
......@@ -66,7 +55,7 @@ object Analytics extends Logging {
sys.exit(1)
}
val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
.map(pickPartitioner(_))
.map(PartitionStrategy.fromString(_))
val edgeStorageLevel = options.remove("edgeStorageLevel")
.map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
val vertexStorageLevel = options.remove("vertexStorageLevel")
......@@ -106,7 +95,7 @@ object Analytics extends Logging {
if (!outFname.isEmpty) {
logWarning("Saving pageranks of pages to " + outFname)
pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
pr.map { case (id, r) => id + "\t" + r }.saveAsTextFile(outFname)
}
sc.stop()
......@@ -128,7 +117,7 @@ object Analytics extends Logging {
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
val cc = ConnectedComponents.run(graph)
println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
println("Components: " + cc.vertices.map { case (vid, data) => data }.distinct())
sc.stop()
case "triangles" =>
......@@ -146,7 +135,7 @@ object Analytics extends Logging {
minEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
// TriangleCount requires the graph to be partitioned
// TriangleCount requires the graph to be partitioned
.partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
val triangles = TriangleCount.run(graph)
println("Triangles: " + triangles.vertices.map {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment