Skip to content
Snippets Groups Projects
Commit 2e069ca6 authored by Xiangrui Meng's avatar Xiangrui Meng
Browse files

[SPARK-3001][MLLIB] Improve Spearman's correlation

The current implementation requires sorting individual columns, which could be done with a global sort.

result on a 32-node cluster:

m | n | prev | this
---|---|-------|-----
1000000 | 50 | 55s | 9s
10000000 | 50 | 97s | 76s
1000000 | 100  | 119s | 15s

Author: Xiangrui Meng <meng@databricks.com>

Closes #1917 from mengxr/spearman and squashes the following commits:

4d5d262 [Xiangrui Meng] remove unused import
85c48de [Xiangrui Meng] minor updates
a048d0c [Xiangrui Meng] remove cache and set a limit to cachedIds
b98bb18 [Xiangrui Meng] add comments
0846e07 [Xiangrui Meng] first version
parent 5d25c0b7
No related branches found
No related tags found
No related merge requests found
...@@ -19,10 +19,10 @@ package org.apache.spark.mllib.stat.correlation ...@@ -19,10 +19,10 @@ package org.apache.spark.mllib.stat.correlation
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, HashPartitioner} import org.apache.spark.Logging
import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
import org.apache.spark.rdd.{CoGroupedRDD, RDD} import org.apache.spark.rdd.RDD
/** /**
* Compute Spearman's correlation for two RDDs of the type RDD[Double] or the correlation matrix * Compute Spearman's correlation for two RDDs of the type RDD[Double] or the correlation matrix
...@@ -43,87 +43,51 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging { ...@@ -43,87 +43,51 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
/** /**
* Compute Spearman's correlation matrix S, for the input matrix, where S(i, j) is the * Compute Spearman's correlation matrix S, for the input matrix, where S(i, j) is the
* correlation between column i and j. * correlation between column i and j.
*
* Input RDD[Vector] should be cached or checkpointed if possible since it would be split into
* numCol RDD[Double]s, each of which sorted, and the joined back into a single RDD[Vector].
*/ */
override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = { override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
val indexed = X.zipWithUniqueId() // ((columnIndex, value), rowUid)
val colBased = X.zipWithUniqueId().flatMap { case (vec, uid) =>
val numCols = X.first.size vec.toArray.view.zipWithIndex.map { case (v, j) =>
if (numCols > 50) { ((j, v), uid)
logWarning("Computing the Spearman correlation matrix can be slow for large RDDs with more" }
+ " than 50 columns.")
}
val ranks = new Array[RDD[(Long, Double)]](numCols)
// Note: we use a for loop here instead of a while loop with a single index variable
// to avoid race condition caused by closure serialization
for (k <- 0 until numCols) {
val column = indexed.map { case (vector, index) => (vector(k), index) }
ranks(k) = getRanks(column)
} }
// global sort by (columnIndex, value)
val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X) val sorted = colBased.sortByKey()
PearsonCorrelation.computeCorrelationMatrix(ranksMat) // assign global ranks (using average ranks for tied values)
} val globalRanks = sorted.zipWithIndex().mapPartitions { iter =>
var preCol = -1
/** var preVal = Double.NaN
* Compute the ranks for elements in the input RDD, using the average method for ties. var startRank = -1.0
* var cachedUids = ArrayBuffer.empty[Long]
* With the average method, elements with the same value receive the same rank that's computed val flush: () => Iterable[(Long, (Int, Double))] = () => {
* by taking the average of their positions in the sorted list. val averageRank = startRank + (cachedUids.size - 1) / 2.0
* e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5] val output = cachedUids.map { uid =>
* Note that positions here are 0-indexed, instead of the 1-indexed as in the definition for (uid, (preCol, averageRank))
* ranks in the standard definition for Spearman's correlation. This does not affect the final
* results and is slightly more performant.
*
* @param indexed RDD[(Double, Long)] containing pairs of the format (originalValue, uniqueId)
* @return RDD[(Long, Double)] containing pairs of the format (uniqueId, rank), where uniqueId is
* copied from the input RDD.
*/
private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] = {
// Get elements' positions in the sorted list for computing average rank for duplicate values
val sorted = indexed.sortByKey().zipWithIndex()
val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
// add an extra element to signify the end of the list so that flatMap can flush the last
// batch of duplicates
val end = -1L
val padded = iter ++ Iterator[((Double, Long), Long)](((Double.NaN, end), end))
val firstEntry = padded.next()
var lastVal = firstEntry._1._1
var firstRank = firstEntry._2.toDouble
val idBuffer = ArrayBuffer(firstEntry._1._2)
padded.flatMap { case ((v, id), rank) =>
if (v == lastVal && id != end) {
idBuffer += id
Iterator.empty
} else {
val entries = if (idBuffer.size == 1) {
Iterator((idBuffer(0), firstRank))
} else {
val averageRank = firstRank + (idBuffer.size - 1.0) / 2.0
idBuffer.map(id => (id, averageRank))
}
lastVal = v
firstRank = rank
idBuffer.clear()
idBuffer += id
entries
} }
cachedUids.clear()
output
} }
iter.flatMap { case (((j, v), uid), rank) =>
// If we see a new value or cachedUids is too big, we flush ids with their average rank.
if (j != preCol || v != preVal || cachedUids.size >= 10000000) {
val output = flush()
preCol = j
preVal = v
startRank = rank
cachedUids += uid
output
} else {
cachedUids += uid
Iterator.empty
}
} ++ flush()
} }
ranks // Replace values in the input matrix by their ranks compared with values in the same column.
} // Note that shifting all ranks in a column by a constant value doesn't affect result.
val groupedRanks = globalRanks.groupByKey().map { case (uid, iter) =>
private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: RDD[Vector]): RDD[Vector] = { // sort by column index and then convert values to a vector
val partitioner = new HashPartitioner(input.partitions.size) Vectors.dense(iter.toSeq.sortBy(_._1).map(_._2).toArray)
val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
cogrouped.map {
case (_, values: Array[Iterable[_]]) =>
val doubles = values.asInstanceOf[Array[Iterable[Double]]]
new DenseVector(doubles.flatten.toArray)
} }
PearsonCorrelation.computeCorrelationMatrix(groupedRanks)
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment