Skip to content
Snippets Groups Projects
Commit 323e7390 authored by Xiangrui Meng's avatar Xiangrui Meng
Browse files

Revert "[SPARK-14154][MLLIB] Simplify the implementation for Kolmogorov–Smirnov test"

This reverts commit d2a819a6.
parent dd11e401
No related branches found
No related tags found
No related merge requests found
......@@ -64,10 +64,11 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
*/
def testOneSample(data: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult = {
val n = data.count().toDouble
val ksStat = data.sortBy(x => x).zipWithIndex().map { case (v, i) =>
val f = cdf(v)
math.max(f - i / n, (i + 1) / n - f)
}.max()
val localData = data.sortBy(x => x).mapPartitions { part =>
val partDiffs = oneSampleDifferences(part, n, cdf) // local distances
searchOneSampleCandidates(partDiffs) // candidates: local extrema
}.collect()
val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme
evalOneSampleP(ksStat, n.toLong)
}
......@@ -83,6 +84,74 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
testOneSample(data, cdf)
}
/**
* Calculate unadjusted distances between the empirical CDF and the theoretical CDF in a
* partition
* @param partData `Iterator[Double]` 1 partition of a sorted RDD
* @param n `Double` the total size of the RDD
* @param cdf `Double => Double` a function the calculates the theoretical CDF of a value
* @return `Iterator[(Double, Double)] `Unadjusted (ie. off by a constant) potential extrema
* in a partition. The first element corresponds to the (empirical CDF - 1/N) - CDF,
* the second element corresponds to empirical CDF - CDF. We can then search the resulting
* iterator for the minimum of the first and the maximum of the second element, and provide
* this as a partition's candidate extrema
*/
private def oneSampleDifferences(partData: Iterator[Double], n: Double, cdf: Double => Double)
: Iterator[(Double, Double)] = {
// zip data with index (within that partition)
// calculate local (unadjusted) empirical CDF and subtract CDF
partData.zipWithIndex.map { case (v, ix) =>
// dp and dl are later adjusted by constant, when global info is available
val dp = (ix + 1) / n
val dl = ix / n
val cdfVal = cdf(v)
(dl - cdfVal, dp - cdfVal)
}
}
/**
* Search the unadjusted differences in a partition and return the
* two extrema (furthest below and furthest above CDF), along with a count of elements in that
* partition
* @param partDiffs `Iterator[(Double, Double)]` the unadjusted differences between empirical CDF
* and CDFin a partition, which come as a tuple of
* (empirical CDF - 1/N - CDF, empirical CDF - CDF)
* @return `Iterator[(Double, Double, Double)]` the local extrema and a count of elements
*/
private def searchOneSampleCandidates(partDiffs: Iterator[(Double, Double)])
: Iterator[(Double, Double, Double)] = {
val initAcc = (Double.MaxValue, Double.MinValue, 0.0)
val pResults = partDiffs.foldLeft(initAcc) { case ((pMin, pMax, pCt), (dl, dp)) =>
(math.min(pMin, dl), math.max(pMax, dp), pCt + 1)
}
val results = if (pResults == initAcc) Array[(Double, Double, Double)]() else Array(pResults)
results.iterator
}
/**
* Find the global maximum distance between empirical CDF and CDF (i.e. the KS statistic) after
* adjusting local extrema estimates from individual partitions with the amount of elements in
* preceding partitions
* @param localData `Array[(Double, Double, Double)]` A local array containing the collected
* results of `searchOneSampleCandidates` across all partitions
* @param n `Double`The size of the RDD
* @return The one-sample Kolmogorov Smirnov Statistic
*/
private def searchOneSampleStatistic(localData: Array[(Double, Double, Double)], n: Double)
: Double = {
val initAcc = (Double.MinValue, 0.0)
// adjust differences based on the number of elements preceding it, which should provide
// the correct distance between empirical CDF and CDF
val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) =>
val adjConst = prevCt / n
val dist1 = math.abs(minCand + adjConst)
val dist2 = math.abs(maxCand + adjConst)
val maxVal = Array(prevMax, dist1, dist2).max
(maxVal, prevCt + ct)
}
results._1
}
/**
* A convenience function that allows running the KS test for 1 set of sample data against
* a named distribution
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment