From 21b3d2a75f679b252e293000d706741dca33624a Mon Sep 17 00:00:00 2001
From: Sean Owen <sowen@cloudera.com>
Date: Thu, 10 Dec 2015 14:05:45 +0000
Subject: [PATCH] [SPARK-11530][MLLIB] Return eigenvalues with PCA model

Add `computePrincipalComponentsAndVariance` to also compute PCA's explained variance.

CC mengxr

Author: Sean Owen <sowen@cloudera.com>

Closes #9736 from srowen/SPARK-11530.
---
 .../org/apache/spark/ml/feature/PCA.scala     | 20 ++++++-----
 .../org/apache/spark/mllib/feature/PCA.scala  | 16 ++++++---
 .../mllib/linalg/distributed/RowMatrix.scala  | 33 +++++++++++++++----
 .../apache/spark/ml/feature/PCASuite.scala    |  7 ++--
 .../apache/spark/mllib/feature/PCASuite.scala |  3 +-
 .../linalg/distributed/RowMatrixSuite.scala   | 10 +++++-
 project/MimaExcludes.scala                    |  3 ++
 7 files changed, 67 insertions(+), 25 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
index aa88cb03d2..53d33ea2b8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
@@ -73,7 +73,7 @@ class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams
     val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v}
     val pca = new feature.PCA(k = $(k))
     val pcaModel = pca.fit(input)
-    copyValues(new PCAModel(uid, pcaModel.pc).setParent(this))
+    copyValues(new PCAModel(uid, pcaModel.pc, pcaModel.explainedVariance).setParent(this))
   }
 
   override def transformSchema(schema: StructType): StructType = {
@@ -105,7 +105,8 @@ object PCA extends DefaultParamsReadable[PCA] {
 @Experimental
 class PCAModel private[ml] (
     override val uid: String,
-    val pc: DenseMatrix)
+    val pc: DenseMatrix,
+    val explainedVariance: DenseVector)
   extends Model[PCAModel] with PCAParams with MLWritable {
 
   import PCAModel._
@@ -123,7 +124,7 @@ class PCAModel private[ml] (
    */
   override def transform(dataset: DataFrame): DataFrame = {
     transformSchema(dataset.schema, logging = true)
-    val pcaModel = new feature.PCAModel($(k), pc)
+    val pcaModel = new feature.PCAModel($(k), pc, explainedVariance)
     val pcaOp = udf { pcaModel.transform _ }
     dataset.withColumn($(outputCol), pcaOp(col($(inputCol))))
   }
@@ -139,7 +140,7 @@ class PCAModel private[ml] (
   }
 
   override def copy(extra: ParamMap): PCAModel = {
-    val copied = new PCAModel(uid, pc)
+    val copied = new PCAModel(uid, pc, explainedVariance)
     copyValues(copied, extra).setParent(parent)
   }
 
@@ -152,11 +153,11 @@ object PCAModel extends MLReadable[PCAModel] {
 
   private[PCAModel] class PCAModelWriter(instance: PCAModel) extends MLWriter {
 
-    private case class Data(pc: DenseMatrix)
+    private case class Data(pc: DenseMatrix, explainedVariance: DenseVector)
 
     override protected def saveImpl(path: String): Unit = {
       DefaultParamsWriter.saveMetadata(instance, path, sc)
-      val data = Data(instance.pc)
+      val data = Data(instance.pc, instance.explainedVariance)
       val dataPath = new Path(path, "data").toString
       sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
     }
@@ -169,10 +170,11 @@ object PCAModel extends MLReadable[PCAModel] {
     override def load(path: String): PCAModel = {
       val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
       val dataPath = new Path(path, "data").toString
-      val Row(pc: DenseMatrix) = sqlContext.read.parquet(dataPath)
-        .select("pc")
+      val Row(pc: DenseMatrix, explainedVariance: DenseVector) =
+        sqlContext.read.parquet(dataPath)
+        .select("pc", "explainedVariance")
         .head()
-      val model = new PCAModel(metadata.uid, pc)
+      val model = new PCAModel(metadata.uid, pc, explainedVariance)
       DefaultParamsReader.getAndSetParams(model, metadata)
       model
     }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
index ecb3c1e6c1..24e0a98c39 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.mllib.feature
 
-import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.annotation.Since
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.mllib.linalg._
 import org.apache.spark.mllib.linalg.distributed.RowMatrix
@@ -43,7 +43,8 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) {
       s"source vector size is ${sources.first().size} must be greater than k=$k")
 
     val mat = new RowMatrix(sources)
-    val pc = mat.computePrincipalComponents(k) match {
+    val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(k)
+    val densePC = pc match {
       case dm: DenseMatrix =>
         dm
       case sm: SparseMatrix =>
@@ -58,7 +59,13 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) {
           s"SparseMatrix or DenseMatrix. Instead got: ${m.getClass}")
 
     }
-    new PCAModel(k, pc)
+    val denseExplainedVariance = explainedVariance match {
+      case dv: DenseVector =>
+        dv
+      case sv: SparseVector =>
+        sv.toDense
+    }
+    new PCAModel(k, densePC, denseExplainedVariance)
   }
 
   /**
@@ -77,7 +84,8 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) {
 @Since("1.4.0")
 class PCAModel private[spark] (
     @Since("1.4.0") val k: Int,
-    @Since("1.4.0") val pc: DenseMatrix) extends VectorTransformer {
+    @Since("1.4.0") val pc: DenseMatrix,
+    @Since("1.6.0") val explainedVariance: DenseVector) extends VectorTransformer {
   /**
    * Transform a vector by computed Principal Components.
    *
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 52c0f19c64..2018a67868 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -368,7 +368,8 @@ class RowMatrix @Since("1.0.0") (
   }
 
   /**
-   * Computes the top k principal components.
+   * Computes the top k principal components and a vector of proportions of
+   * variance explained by each principal component.
    * Rows correspond to observations and columns correspond to variables.
    * The principal components are stored a local matrix of size n-by-k.
    * Each column corresponds for one principal component,
@@ -379,24 +380,42 @@ class RowMatrix @Since("1.0.0") (
    * Note that this cannot be computed on matrices with more than 65535 columns.
    *
    * @param k number of top principal components.
-   * @return a matrix of size n-by-k, whose columns are principal components
+   * @return a matrix of size n-by-k, whose columns are principal components, and
+   * a vector of values which indicate how much variance each principal component
+   * explains
    */
-  @Since("1.0.0")
-  def computePrincipalComponents(k: Int): Matrix = {
+  @Since("1.6.0")
+  def computePrincipalComponentsAndExplainedVariance(k: Int): (Matrix, Vector) = {
     val n = numCols().toInt
     require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]")
 
     val Cov = computeCovariance().toBreeze.asInstanceOf[BDM[Double]]
 
-    val brzSvd.SVD(u: BDM[Double], _, _) = brzSvd(Cov)
+    val brzSvd.SVD(u: BDM[Double], s: BDV[Double], _) = brzSvd(Cov)
+
+    val eigenSum = s.data.sum
+    val explainedVariance = s.data.map(_ / eigenSum)
 
     if (k == n) {
-      Matrices.dense(n, k, u.data)
+      (Matrices.dense(n, k, u.data), Vectors.dense(explainedVariance))
     } else {
-      Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k))
+      (Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)),
+        Vectors.dense(Arrays.copyOfRange(explainedVariance, 0, k)))
     }
   }
 
+  /**
+   * Computes the top k principal components only.
+   *
+   * @param k number of top principal components.
+   * @return a matrix of size n-by-k, whose columns are principal components
+   * @see computePrincipalComponentsAndExplainedVariance
+   */
+  @Since("1.0.0")
+  def computePrincipalComponents(k: Int): Matrix = {
+    computePrincipalComponentsAndExplainedVariance(k)._1
+  }
+
   /**
    * Computes column-wise summary statistics.
    */
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala
index edab21e6c3..9f6618b929 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala
@@ -24,7 +24,6 @@ import org.apache.spark.mllib.linalg.distributed.RowMatrix
 import org.apache.spark.mllib.linalg._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
-import org.apache.spark.mllib.feature.{PCAModel => OldPCAModel}
 import org.apache.spark.sql.Row
 
 class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
@@ -32,7 +31,8 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
   test("params") {
     ParamsSuite.checkParams(new PCA)
     val mat = Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 3.0)).asInstanceOf[DenseMatrix]
-    val model = new PCAModel("pca", mat)
+    val explainedVariance = Vectors.dense(0.5, 0.5).asInstanceOf[DenseVector]
+    val model = new PCAModel("pca", mat, explainedVariance)
     ParamsSuite.checkParams(model)
   }
 
@@ -76,7 +76,8 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
 
   test("PCAModel read/write") {
     val instance = new PCAModel("myPCAModel",
-      Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 3.0)).asInstanceOf[DenseMatrix])
+      Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 3.0)).asInstanceOf[DenseMatrix],
+      Vectors.dense(0.5, 0.5).asInstanceOf[DenseVector])
     val newInstance = testDefaultReadWrite(instance)
     assert(newInstance.pc === instance.pc)
   }
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
index e57f491913..a8d82932d3 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
@@ -37,11 +37,12 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext {
     val pca = new PCA(k).fit(dataRDD)
 
     val mat = new RowMatrix(dataRDD)
-    val pc = mat.computePrincipalComponents(k)
+    val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(k)
 
     val pca_transform = pca.transform(dataRDD).collect()
     val mat_multiply = mat.multiply(pc).rows.collect()
 
     assert(pca_transform.toSet === mat_multiply.toSet)
+    assert(pca.explainedVariance === explainedVariance)
   }
 }
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index 4abb98fb6f..0ff901ddc4 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.mllib.linalg.distributed
 
+import java.util.Arrays
+
 import scala.util.Random
 
 import breeze.numerics.abs
@@ -49,6 +51,7 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
     (0.0, 1.0, 0.0),
     (math.sqrt(2.0) / 2.0, 0.0, math.sqrt(2.0) / 2.0),
     (math.sqrt(2.0) / 2.0, 0.0, - math.sqrt(2.0) / 2.0))
+  val explainedVariance = BDV(4.0 / 7.0, 3.0 / 7.0, 0.0)
 
   var denseMat: RowMatrix = _
   var sparseMat: RowMatrix = _
@@ -201,10 +204,15 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
 
   test("pca") {
     for (mat <- Seq(denseMat, sparseMat); k <- 1 to n) {
-      val pc = denseMat.computePrincipalComponents(k)
+      val (pc, expVariance) = mat.computePrincipalComponentsAndExplainedVariance(k)
       assert(pc.numRows === n)
       assert(pc.numCols === k)
       assertColumnEqualUpToSign(pc.toBreeze.asInstanceOf[BDM[Double]], principalComponents, k)
+      assert(
+        closeToZero(BDV(expVariance.toArray) -
+        BDV(Arrays.copyOfRange(explainedVariance.data, 0, k))))
+      // Check that this method returns the same answer
+      assert(pc === mat.computePrincipalComponents(k))
     }
   }
 
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 685cb419ca..edae59d882 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -57,6 +57,9 @@ object MimaExcludes {
         // MiMa does not deal properly with sealed traits
         ProblemFilters.exclude[MissingMethodProblem](
           "org.apache.spark.ml.classification.LogisticRegressionSummary.featuresCol")
+      ) ++ Seq(
+        // SPARK-11530
+        ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.feature.PCAModel.this")
       ) ++ Seq(
         // SPARK-10381 Fix types / units in private AskPermissionToCommitOutput RPC message.
         // This class is marked as `private` but MiMa still seems to be confused by the change.
-- 
GitLab