Skip to content
Snippets Groups Projects
Commit e5be4de7 authored by NamelessAnalyst's avatar NamelessAnalyst Committed by Ankur Dave
Browse files

SPARK-3716 [GraphX] Update Analytics.scala for partitionStrategy assignment


Previously, when the val partitionStrategy was created it called a function in the Analytics object which was a copy of the PartitionStrategy.fromString() method. This function has been removed, and the assignment of partitionStrategy now uses the PartitionStrategy.fromString method instead. In this way, it better matches the declarations of edge/vertex StorageLevel variables.

Author: NamelessAnalyst <NamelessAnalyst@users.noreply.github.com>

Closes #2569 from NamelessAnalyst/branch-1.1 and squashes the following commits:

c24ff51 [NamelessAnalyst] Update Analytics.scala

(cherry picked from commit 5a21e3e7)
Signed-off-by: default avatarAnkur Dave <ankurdave@gmail.com>
parent 18bd67c2
No related branches found
No related tags found
No related merge requests found
......@@ -46,17 +46,6 @@ object Analytics extends Logging {
}
val options = mutable.Map(optionsList: _*)
def pickPartitioner(v: String): PartitionStrategy = {
// TODO: Use reflection rather than listing all the partitioning strategies here.
v match {
case "RandomVertexCut" => RandomVertexCut
case "EdgePartition1D" => EdgePartition1D
case "EdgePartition2D" => EdgePartition2D
case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + v)
}
}
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
......@@ -67,7 +56,7 @@ object Analytics extends Logging {
sys.exit(1)
}
val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
.map(pickPartitioner(_))
.map(PartitionStrategy.fromString(_))
val edgeStorageLevel = options.remove("edgeStorageLevel")
.map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
val vertexStorageLevel = options.remove("vertexStorageLevel")
......@@ -107,7 +96,7 @@ object Analytics extends Logging {
if (!outFname.isEmpty) {
logWarning("Saving pageranks of pages to " + outFname)
pr.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
pr.map { case (id, r) => id + "\t" + r }.saveAsTextFile(outFname)
}
sc.stop()
......@@ -129,7 +118,7 @@ object Analytics extends Logging {
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
val cc = ConnectedComponents.run(graph)
println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
println("Components: " + cc.vertices.map { case (vid, data) => data }.distinct())
sc.stop()
case "triangles" =>
......@@ -147,7 +136,7 @@ object Analytics extends Logging {
minEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
// TriangleCount requires the graph to be partitioned
// TriangleCount requires the graph to be partitioned
.partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
val triangles = TriangleCount.run(graph)
println("Triangles: " + triangles.vertices.map {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment