Skip to content
Snippets Groups Projects
Commit 6ee8338b authored by Burak Yavuz's avatar Burak Yavuz Committed by Xiangrui Meng
Browse files

[SPARK-5486] Added validate method to BlockMatrix

The `validate` method will allow users to debug their `BlockMatrix`, if operations like `add` or `multiply` return unexpected results. It checks the following properties in a `BlockMatrix`:
- Are the dimensions of the `BlockMatrix` consistent with what the user entered: (`nRows`, `nCols`)
- Are the dimensions of each `MatrixBlock` consistent with what the user entered: (`rowsPerBlock`, `colsPerBlock`)
- Are there blocks with duplicate indices

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #4279 from brkyvz/SPARK-5486 and squashes the following commits:

c152a73 [Burak Yavuz] addressed code review v2
598c583 [Burak Yavuz] merged master
b55ac5c [Burak Yavuz] addressed code review v1
25f083b [Burak Yavuz] simplify implementation
0aa519a [Burak Yavuz] [SPARK-5486] Added validate method to BlockMatrix
parent 0a95085f
No related branches found
No related tags found
No related merge requests found
......@@ -21,8 +21,8 @@ import scala.collection.mutable.ArrayBuffer
import breeze.linalg.{DenseMatrix => BDM}
import org.apache.spark.{Logging, Partitioner}
import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix}
import org.apache.spark.{SparkException, Logging, Partitioner}
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
......@@ -158,11 +158,13 @@ class BlockMatrix(
private[mllib] var partitioner: GridPartitioner =
GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size)
private lazy val blockInfo = blocks.mapValues(block => (block.numRows, block.numCols)).cache()
/** Estimates the dimensions of the matrix. */
private def estimateDim(): Unit = {
val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
(blockRowIndex.toLong * rowsPerBlock + mat.numRows,
blockColIndex.toLong * colsPerBlock + mat.numCols)
val (rows, cols) = blockInfo.map { case ((blockRowIndex, blockColIndex), (m, n)) =>
(blockRowIndex.toLong * rowsPerBlock + m,
blockColIndex.toLong * colsPerBlock + n)
}.reduce { (x0, x1) =>
(math.max(x0._1, x1._1), math.max(x0._2, x1._2))
}
......@@ -172,6 +174,41 @@ class BlockMatrix(
assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.")
}
def validate(): Unit = {
logDebug("Validating BlockMatrix...")
// check if the matrix is larger than the claimed dimensions
estimateDim()
logDebug("BlockMatrix dimensions are okay...")
// Check if there are multiple MatrixBlocks with the same index.
blockInfo.countByKey().foreach { case (key, cnt) =>
if (cnt > 1) {
throw new SparkException(s"Found multiple MatrixBlocks with the indices $key. Please " +
"remove blocks with duplicate indices.")
}
}
logDebug("MatrixBlock indices are okay...")
// Check if each MatrixBlock (except edges) has the dimensions rowsPerBlock x colsPerBlock
// The first tuple is the index and the second tuple is the dimensions of the MatrixBlock
val dimensionMsg = s"dimensions different than rowsPerBlock: $rowsPerBlock, and " +
s"colsPerBlock: $colsPerBlock. Blocks on the right and bottom edges can have smaller " +
s"dimensions. You may use the repartition method to fix this issue."
blockInfo.foreach { case ((blockRowIndex, blockColIndex), (m, n)) =>
if ((blockRowIndex < numRowBlocks - 1 && m != rowsPerBlock) ||
(blockRowIndex == numRowBlocks - 1 && (m <= 0 || m > rowsPerBlock))) {
throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " +
dimensionMsg)
}
if ((blockColIndex < numColBlocks - 1 && n != colsPerBlock) ||
(blockColIndex == numColBlocks - 1 && (n <= 0 || n > colsPerBlock))) {
throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " +
dimensionMsg)
}
}
logDebug("MatrixBlock dimensions are okay...")
logDebug("BlockMatrix is valid!")
}
/** Caches the underlying RDD. */
def cache(): this.type = {
blocks.cache()
......
......@@ -22,6 +22,7 @@ import scala.util.Random
import breeze.linalg.{DenseMatrix => BDM}
import org.scalatest.FunSuite
import org.apache.spark.SparkException
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix}
import org.apache.spark.mllib.util.MLlibTestSparkContext
......@@ -147,6 +148,47 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(gridBasedMat.toBreeze() === expected)
}
test("validate") {
// No error
gridBasedMat.validate()
// Wrong MatrixBlock dimensions
val blocks: Seq[((Int, Int), Matrix)] = Seq(
((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
val rdd = sc.parallelize(blocks, numPartitions)
val wrongRowPerParts = new BlockMatrix(rdd, rowPerPart + 1, colPerPart)
val wrongColPerParts = new BlockMatrix(rdd, rowPerPart, colPerPart + 1)
intercept[SparkException] {
wrongRowPerParts.validate()
}
intercept[SparkException] {
wrongColPerParts.validate()
}
// Wrong BlockMatrix dimensions
val wrongRowSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 4, 4)
intercept[AssertionError] {
wrongRowSize.validate()
}
val wrongColSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 5, 2)
intercept[AssertionError] {
wrongColSize.validate()
}
// Duplicate indices
val duplicateBlocks: Seq[((Int, Int), Matrix)] = Seq(
((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
((0, 0), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
((1, 1), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
val dupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks, numPartitions), 2, 2)
intercept[SparkException] {
dupMatrix.validate()
}
}
test("transpose") {
val expected = BDM(
(1.0, 0.0, 3.0, 0.0, 0.0),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment