From 825ab1e4526059a77e3278769797c4d065f48bd3 Mon Sep 17 00:00:00 2001
From: Liang-Chi Hsieh <viirya@appier.com>
Date: Wed, 22 Jul 2015 23:29:26 -0700
Subject: [PATCH] [SPARK-7254] [MLLIB] Run PowerIterationClustering directly on
 graph

JIRA: https://issues.apache.org/jira/browse/SPARK-7254

Author: Liang-Chi Hsieh <viirya@appier.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #6054 from viirya/pic_on_graph and squashes the following commits:

8b87b81 [Liang-Chi Hsieh] Fix scala style.
a22fb8b [Liang-Chi Hsieh] For comment.
ef565a0 [Liang-Chi Hsieh] Fix indentation.
d249aa1 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into pic_on_graph
82d7351 [Liang-Chi Hsieh] Run PowerIterationClustering directly on graph.
---
 .../clustering/PowerIterationClustering.scala | 46 ++++++++++++++++++
 .../PowerIterationClusteringSuite.scala       | 48 +++++++++++++++++++
 2 files changed, 94 insertions(+)

diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index e7a243f854..407e43a024 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -153,6 +153,27 @@ class PowerIterationClustering private[clustering] (
     this
   }
 
+  /**
+   * Run the PIC algorithm on Graph.
+   *
+   * @param graph an affinity matrix represented as graph, which is the matrix A in the PIC paper.
+   *              The similarity s,,ij,, represented as the edge between vertices (i, j) must
+   *              be nonnegative. This is a symmetric matrix and hence s,,ij,, = s,,ji,,. For
+   *              any (i, j) with nonzero similarity, there should be either (i, j, s,,ij,,)
+   *              or (j, i, s,,ji,,) in the input. Tuples with i = j are ignored, because we
+   *              assume s,,ij,, = 0.0.
+   *
+   * @return a [[PowerIterationClusteringModel]] that contains the clustering result
+   */
+  def run(graph: Graph[Double, Double]): PowerIterationClusteringModel = {
+    val w = normalize(graph)
+    val w0 = initMode match {
+      case "random" => randomInit(w)
+      case "degree" => initDegreeVector(w)
+    }
+    pic(w0)
+  }
+
   /**
    * Run the PIC algorithm.
    *
@@ -212,6 +233,31 @@ object PowerIterationClustering extends Logging {
   @Experimental
   case class Assignment(id: Long, cluster: Int)
 
+  /**
+   * Normalizes the affinity graph (A) and returns the normalized affinity matrix (W).
+   */
+  private[clustering]
+  def normalize(graph: Graph[Double, Double]): Graph[Double, Double] = {
+    val vD = graph.aggregateMessages[Double](
+      sendMsg = ctx => {
+        val i = ctx.srcId
+        val j = ctx.dstId
+        val s = ctx.attr
+        if (s < 0.0) {
+          throw new SparkException("Similarity must be nonnegative but found s($i, $j) = $s.")
+        }
+        if (s > 0.0) {
+          ctx.sendToSrc(s)
+        }
+      },
+      mergeMsg = _ + _,
+      TripletFields.EdgeOnly)
+    GraphImpl.fromExistingRDDs(vD, graph.edges)
+      .mapTriplets(
+        e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
+        TripletFields.Src)
+  }
+
   /**
    * Normalizes the affinity matrix (A) by row sums and returns the normalized affinity matrix (W).
    */
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
index 19e65f1b53..1890005121 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
@@ -68,6 +68,54 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
     assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
   }
 
+  test("power iteration clustering on graph") {
+    /*
+     We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
+     edge (3, 4).
+
+     15-14 -13 -12
+     |           |
+     4 . 3 - 2  11
+     |   | x |   |
+     5   0 - 1  10
+     |           |
+     6 - 7 - 8 - 9
+     */
+
+    val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
+      (1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
+      (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
+      (10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
+
+    val edges = similarities.flatMap { case (i, j, s) =>
+      if (i != j) {
+        Seq(Edge(i, j, s), Edge(j, i, s))
+      } else {
+        None
+      }
+    }
+    val graph = Graph.fromEdges(sc.parallelize(edges, 2), 0.0)
+
+    val model = new PowerIterationClustering()
+      .setK(2)
+      .run(graph)
+    val predictions = Array.fill(2)(mutable.Set.empty[Long])
+    model.assignments.collect().foreach { a =>
+      predictions(a.cluster) += a.id
+    }
+    assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+
+    val model2 = new PowerIterationClustering()
+      .setK(2)
+      .setInitializationMode("degree")
+      .run(sc.parallelize(similarities, 2))
+    val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
+    model2.assignments.collect().foreach { a =>
+      predictions2(a.cluster) += a.id
+    }
+    assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+  }
+
   test("normalize and powerIter") {
     /*
      Test normalize() with the following graph:
-- 
GitLab