From c80586d9e820d19fc328b3e4c6f1c1439f5583a7 Mon Sep 17 00:00:00 2001
From: Fokko Driesprong <f.driesprong@catawiki.nl>
Date: Thu, 14 Apr 2016 17:32:20 -0700
Subject: [PATCH] [SPARK-12869] Implemented an improved version of the
 toIndexedRowMatrix

Hi guys,

I've implemented an improved version of the `toIndexedRowMatrix` function on the `BlockMatrix`. I needed this for a project, but would like to share it with the rest of the community. In the case of dense matrices, it can increase performance up to 19 times:
https://github.com/Fokko/BlockMatrixToIndexedRowMatrix

If there are any questions or suggestions, please let me know. Keep up the good work! Cheers.

Author: Fokko Driesprong <f.driesprong@catawiki.nl>
Author: Fokko Driesprong <fokko@driesprongen.nl>

Closes #10839 from Fokko/master.
---
 .../linalg/distributed/BlockMatrix.scala      | 34 +++++++++++++++----
 .../linalg/distributed/BlockMatrixSuite.scala | 31 +++++++++++++++--
 2 files changed, 57 insertions(+), 8 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index 89c332ae38..580d7a98fb 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -19,12 +19,12 @@ package org.apache.spark.mllib.linalg.distributed
 
 import scala.collection.mutable.ArrayBuffer
 
-import breeze.linalg.{DenseMatrix => BDM, Matrix => BM}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM, SparseVector => BSV, Vector => BV}
 
 import org.apache.spark.{Partitioner, SparkException}
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
-import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix, SparseMatrix}
+import org.apache.spark.mllib.linalg._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -264,13 +264,35 @@ class BlockMatrix @Since("1.3.0") (
     new CoordinateMatrix(entryRDD, numRows(), numCols())
   }
 
+
   /** Converts to IndexedRowMatrix. The number of columns must be within the integer range. */
   @Since("1.3.0")
   def toIndexedRowMatrix(): IndexedRowMatrix = {
-    require(numCols() < Int.MaxValue, "The number of columns must be within the integer range. " +
-      s"numCols: ${numCols()}")
-    // TODO: This implementation may be optimized
-    toCoordinateMatrix().toIndexedRowMatrix()
+    val cols = numCols().toInt
+
+    require(cols < Int.MaxValue, s"The number of columns should be less than Int.MaxValue ($cols).")
+
+    val rows = blocks.flatMap { case ((blockRowIdx, blockColIdx), mat) =>
+      mat.rowIter.zipWithIndex.map {
+        case (vector, rowIdx) =>
+          blockRowIdx * rowsPerBlock + rowIdx -> (blockColIdx, vector.toBreeze)
+      }
+    }.groupByKey().map { case (rowIdx, vectors) =>
+      val numberNonZeroPerRow = vectors.map(_._2.activeSize).sum.toDouble / cols.toDouble
+
+      val wholeVector = if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th nnz
+        BSV.zeros[Double](cols)
+      } else {
+        BDV.zeros[Double](cols)
+      }
+
+      vectors.foreach { case (blockColIdx: Int, vec: BV[Double]) =>
+        val offset = colsPerBlock * blockColIdx
+        wholeVector(offset until offset + colsPerBlock) := vec
+      }
+      new IndexedRow(rowIdx, Vectors.fromBreeze(wholeVector))
+    }
+    new IndexedRowMatrix(rows)
   }
 
   /** Collect the distributed matrix on the driver as a `DenseMatrix`. */
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
index f737d2c51a..f37eaf225a 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala
@@ -19,10 +19,10 @@ package org.apache.spark.mllib.linalg.distributed
 
 import java.{util => ju}
 
-import breeze.linalg.{DenseMatrix => BDM}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
 
 import org.apache.spark.{SparkException, SparkFunSuite}
-import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix, SparseMatrix}
+import org.apache.spark.mllib.linalg.{DenseMatrix, DenseVector, Matrices, Matrix, SparseMatrix, SparseVector, Vectors}
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
 
@@ -134,6 +134,33 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
     assert(rowMat.numRows() === m)
     assert(rowMat.numCols() === n)
     assert(rowMat.toBreeze() === gridBasedMat.toBreeze())
+
+    val rows = 1
+    val cols = 10
+
+    val matDense = new DenseMatrix(rows, cols,
+      Array(1.0, 1.0, 3.0, 2.0, 5.0, 6.0, 7.0, 1.0, 2.0, 3.0))
+    val matSparse = new SparseMatrix(rows, cols,
+      Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1), Array(0), Array(1.0))
+
+    val vectors: Seq[((Int, Int), Matrix)] = Seq(
+      ((0, 0), matDense),
+      ((1, 0), matSparse))
+
+    val rdd = sc.parallelize(vectors)
+    val B = new BlockMatrix(rdd, rows, cols)
+
+    val C = B.toIndexedRowMatrix.rows.collect
+
+    (C(0).vector.toBreeze, C(1).vector.toBreeze) match {
+      case (denseVector: BDV[Double], sparseVector: BSV[Double]) =>
+        assert(denseVector.length === sparseVector.length)
+
+        assert(matDense.toArray === denseVector.toArray)
+        assert(matSparse.toArray === sparseVector.toArray)
+      case _ =>
+        throw new RuntimeException("IndexedRow returns vectors of unexpected type")
+    }
   }
 
   test("toBreeze and toLocalMatrix") {
-- 
GitLab