From d2a819a6363190b946986ebf6f8001d520098c3b Mon Sep 17 00:00:00 2001 From: Yuhao Yang <hhbyyh@gmail.com> Date: Tue, 29 Mar 2016 09:16:50 -0700 Subject: [PATCH] =?UTF-8?q?[SPARK-14154][MLLIB]=20Simplify=20the=20impleme?= =?UTF-8?q?ntation=20for=20Kolmogorov=E2=80=93Smirnov=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? jira: https://issues.apache.org/jira/browse/SPARK-14154 I just read the code for KolmogorovSmirnovTest and find it could be much simplified following the original definition. Send a PR for discussion ## How was this patch tested? unit test Author: Yuhao Yang <hhbyyh@gmail.com> Closes #11954 from hhbyyh/ksoptimize. --- .../stat/test/KolmogorovSmirnovTest.scala | 77 +------------------ 1 file changed, 4 insertions(+), 73 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala index baf9e5e7d1..0ec8975fed 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala @@ -64,11 +64,10 @@ private[stat] object KolmogorovSmirnovTest extends Logging { */ def testOneSample(data: RDD[Double], cdf: Double => Double): KolmogorovSmirnovTestResult = { val n = data.count().toDouble - val localData = data.sortBy(x => x).mapPartitions { part => - val partDiffs = oneSampleDifferences(part, n, cdf) // local distances - searchOneSampleCandidates(partDiffs) // candidates: local extrema - }.collect() - val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme + val ksStat = data.sortBy(x => x).zipWithIndex().map { case (v, i) => + val f = cdf(v) + math.max(f - i / n, (i + 1) / n - f) + }.max() evalOneSampleP(ksStat, n.toLong) } @@ -84,74 +83,6 @@ private[stat] object KolmogorovSmirnovTest extends Logging { testOneSample(data, cdf) } - /** - * Calculate unadjusted distances between the empirical CDF and the theoretical CDF in a - * partition - * @param partData `Iterator[Double]` 1 partition of a sorted RDD - * @param n `Double` the total size of the RDD - * @param cdf `Double => Double` a function the calculates the theoretical CDF of a value - * @return `Iterator[(Double, Double)] `Unadjusted (ie. off by a constant) potential extrema - * in a partition. The first element corresponds to the (empirical CDF - 1/N) - CDF, - * the second element corresponds to empirical CDF - CDF. We can then search the resulting - * iterator for the minimum of the first and the maximum of the second element, and provide - * this as a partition's candidate extrema - */ - private def oneSampleDifferences(partData: Iterator[Double], n: Double, cdf: Double => Double) - : Iterator[(Double, Double)] = { - // zip data with index (within that partition) - // calculate local (unadjusted) empirical CDF and subtract CDF - partData.zipWithIndex.map { case (v, ix) => - // dp and dl are later adjusted by constant, when global info is available - val dp = (ix + 1) / n - val dl = ix / n - val cdfVal = cdf(v) - (dl - cdfVal, dp - cdfVal) - } - } - - /** - * Search the unadjusted differences in a partition and return the - * two extrema (furthest below and furthest above CDF), along with a count of elements in that - * partition - * @param partDiffs `Iterator[(Double, Double)]` the unadjusted differences between empirical CDF - * and CDFin a partition, which come as a tuple of - * (empirical CDF - 1/N - CDF, empirical CDF - CDF) - * @return `Iterator[(Double, Double, Double)]` the local extrema and a count of elements - */ - private def searchOneSampleCandidates(partDiffs: Iterator[(Double, Double)]) - : Iterator[(Double, Double, Double)] = { - val initAcc = (Double.MaxValue, Double.MinValue, 0.0) - val pResults = partDiffs.foldLeft(initAcc) { case ((pMin, pMax, pCt), (dl, dp)) => - (math.min(pMin, dl), math.max(pMax, dp), pCt + 1) - } - val results = if (pResults == initAcc) Array[(Double, Double, Double)]() else Array(pResults) - results.iterator - } - - /** - * Find the global maximum distance between empirical CDF and CDF (i.e. the KS statistic) after - * adjusting local extrema estimates from individual partitions with the amount of elements in - * preceding partitions - * @param localData `Array[(Double, Double, Double)]` A local array containing the collected - * results of `searchOneSampleCandidates` across all partitions - * @param n `Double`The size of the RDD - * @return The one-sample Kolmogorov Smirnov Statistic - */ - private def searchOneSampleStatistic(localData: Array[(Double, Double, Double)], n: Double) - : Double = { - val initAcc = (Double.MinValue, 0.0) - // adjust differences based on the number of elements preceding it, which should provide - // the correct distance between empirical CDF and CDF - val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) => - val adjConst = prevCt / n - val dist1 = math.abs(minCand + adjConst) - val dist2 = math.abs(maxCand + adjConst) - val maxVal = Array(prevMax, dist1, dist2).max - (maxVal, prevCt + ct) - } - results._1 - } - /** * A convenience function that allows running the KS test for 1 set of sample data against * a named distribution -- GitLab