From f4e0b28c859412ec8bdfdf452b6a1b2e1bee310e Mon Sep 17 00:00:00 2001
From: "Joseph E. Gonzalez" <joseph.e.gonzalez@gmail.com>
Date: Sat, 1 Nov 2014 01:18:07 -0700
Subject: [PATCH] [SPARK-4142][GraphX] Default numEdgePartitions

Changing the default number of edge partitions to match spark parallelism.

Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>

Closes #3006 from jegonzal/default_partitions and squashes the following commits:

a9a5c4f [Joseph E. Gonzalez] Changing the default number of edge partitions to match spark parallelism
---
 .../org/apache/spark/examples/graphx/Analytics.scala |  6 +++---
 .../scala/org/apache/spark/graphx/GraphLoader.scala  | 12 +++++++++---
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
index d70d93608a..828cffb01c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
@@ -77,7 +77,7 @@ object Analytics extends Logging {
         val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
 
         val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
-          minEdgePartitions = numEPart,
+          numEdgePartitions = numEPart,
           edgeStorageLevel = edgeStorageLevel,
           vertexStorageLevel = vertexStorageLevel).cache()
         val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
@@ -110,7 +110,7 @@ object Analytics extends Logging {
 
         val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
         val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
-          minEdgePartitions = numEPart,
+          numEdgePartitions = numEPart,
           edgeStorageLevel = edgeStorageLevel,
           vertexStorageLevel = vertexStorageLevel).cache()
         val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
@@ -131,7 +131,7 @@ object Analytics extends Logging {
         val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
         val graph = GraphLoader.edgeListFile(sc, fname,
           canonicalOrientation = true,
-          minEdgePartitions = numEPart,
+          numEdgePartitions = numEPart,
           edgeStorageLevel = edgeStorageLevel,
           vertexStorageLevel = vertexStorageLevel)
           // TriangleCount requires the graph to be partitioned
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index f4c79365b1..4933aecba1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -48,7 +48,8 @@ object GraphLoader extends Logging {
    * @param path the path to the file (e.g., /home/data/file or hdfs://file)
    * @param canonicalOrientation whether to orient edges in the positive
    *        direction
-   * @param minEdgePartitions the number of partitions for the edge RDD
+   * @param numEdgePartitions the number of partitions for the edge RDD
+   * Setting this value to -1 will use the default parallelism.
    * @param edgeStorageLevel the desired storage level for the edge partitions
    * @param vertexStorageLevel the desired storage level for the vertex partitions
    */
@@ -56,7 +57,7 @@ object GraphLoader extends Logging {
       sc: SparkContext,
       path: String,
       canonicalOrientation: Boolean = false,
-      minEdgePartitions: Int = 1,
+      numEdgePartitions: Int = -1,
       edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
       vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
     : Graph[Int, Int] =
@@ -64,7 +65,12 @@ object GraphLoader extends Logging {
     val startTime = System.currentTimeMillis
 
     // Parse the edge data table directly into edge partitions
-    val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
+    val lines =
+      if (numEdgePartitions > 0) {
+        sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions)
+      } else {
+        sc.textFile(path)
+      }
     val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
       val builder = new EdgePartitionBuilder[Int, Int]
       iter.foreach { line =>
-- 
GitLab