Skip to content
Snippets Groups Projects
Commit abb2f921 authored by Junyang Qian's avatar Junyang Qian Committed by Felix Cheung
Browse files

[SPARK-17315][SPARKR] Kolmogorov-Smirnov test SparkR wrapper

## What changes were proposed in this pull request?

This PR tries to add Kolmogorov-Smirnov Test wrapper to SparkR. This wrapper implementation only supports one sample test against normal distribution.

## How was this patch tested?

R unit test.

Author: Junyang Qian <junyangq@databricks.com>

Closes #14881 from junyangq/SPARK-17315.
parent c2a1576c
No related branches found
No related tags found
No related merge requests found
......@@ -42,7 +42,8 @@ exportMethods("glm",
"spark.perplexity",
"spark.isoreg",
"spark.gaussianMixture",
"spark.als")
"spark.als",
"spark.kstest")
# Job group lifecycle management methods
export("setJobGroup",
......@@ -342,7 +343,8 @@ export("as.DataFrame",
"tables",
"uncacheTable",
"print.summary.GeneralizedLinearRegressionModel",
"read.ml")
"read.ml",
"print.summary.KSTest")
export("structField",
"structField.jobj",
......@@ -366,6 +368,7 @@ S3method(print, jobj)
S3method(print, structField)
S3method(print, structType)
S3method(print, summary.GeneralizedLinearRegressionModel)
S3method(print, summary.KSTest)
S3method(structField, character)
S3method(structField, jobj)
S3method(structType, jobj)
......
......@@ -1375,3 +1375,7 @@ setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml")
#' @rdname spark.als
#' @export
setGeneric("spark.als", function(data, ...) { standardGeneric("spark.als") })
#' @rdname spark.kstest
#' @export
setGeneric("spark.kstest", function(data, ...) { standardGeneric("spark.kstest") })
......@@ -88,6 +88,13 @@ setClass("GaussianMixtureModel", representation(jobj = "jobj"))
#' @note ALSModel since 2.1.0
setClass("ALSModel", representation(jobj = "jobj"))
#' S4 class that represents an KSTest
#'
#' @param jobj a Java object reference to the backing Scala KSTestWrapper
#' @export
#' @note KSTest since 2.1.0
setClass("KSTest", representation(jobj = "jobj"))
#' Saves the MLlib model to the input path
#'
#' Saves the MLlib model to the input path. For more information, see the specific
......@@ -1310,3 +1317,101 @@ setMethod("write.ml", signature(object = "ALSModel", path = "character"),
function(object, path, overwrite = FALSE) {
write_internal(object, path, overwrite)
})
#' (One-Sample) Kolmogorov-Smirnov Test
#'
#' @description
#' \code{spark.kstest} Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a
#' continuous distribution.
#'
#' By comparing the largest difference between the empirical cumulative
#' distribution of the sample data and the theoretical distribution we can provide a test for the
#' the null hypothesis that the sample data comes from that theoretical distribution.
#'
#' Users can call \code{summary} to obtain a summary of the test, and \code{print.summary.KSTest}
#' to print out a summary result.
#'
#' @param data a SparkDataFrame of user data.
#' @param testCol column name where the test data is from. It should be a column of double type.
#' @param nullHypothesis name of the theoretical distribution tested against. Currently only
#' \code{"norm"} for normal distribution is supported.
#' @param distParams parameters(s) of the distribution. For \code{nullHypothesis = "norm"},
#' we can provide as a vector the mean and standard deviation of
#' the distribution. If none is provided, then standard normal will be used.
#' If only one is provided, then the standard deviation will be set to be one.
#' @param ... additional argument(s) passed to the method.
#' @return \code{spark.kstest} returns a test result object.
#' @rdname spark.kstest
#' @aliases spark.kstest,SparkDataFrame-method
#' @name spark.kstest
#' @seealso \href{http://spark.apache.org/docs/latest/mllib-statistics.html#hypothesis-testing}{
#' MLlib: Hypothesis Testing}
#' @export
#' @examples
#' \dontrun{
#' data <- data.frame(test = c(0.1, 0.15, 0.2, 0.3, 0.25))
#' df <- createDataFrame(data)
#' test <- spark.ktest(df, "test", "norm", c(0, 1))
#'
#' # get a summary of the test result
#' testSummary <- summary(test)
#' testSummary
#'
#' # print out the summary in an organized way
#' print.summary.KSTest(test)
#' }
#' @note spark.kstest since 2.1.0
setMethod("spark.kstest", signature(data = "SparkDataFrame"),
function(data, testCol = "test", nullHypothesis = c("norm"), distParams = c(0, 1)) {
tryCatch(match.arg(nullHypothesis),
error = function(e) {
msg <- paste("Distribution", nullHypothesis, "is not supported.")
stop(msg)
})
if (nullHypothesis == "norm") {
distParams <- as.numeric(distParams)
mu <- ifelse(length(distParams) < 1, 0, distParams[1])
sigma <- ifelse(length(distParams) < 2, 1, distParams[2])
jobj <- callJStatic("org.apache.spark.ml.r.KSTestWrapper",
"test", data@sdf, testCol, nullHypothesis,
as.array(c(mu, sigma)))
new("KSTest", jobj = jobj)
}
})
# Get the summary of Kolmogorov-Smirnov (KS) Test.
#' @param object test result object of KSTest by \code{spark.kstest}.
#' @return \code{summary} returns a list containing the p-value, test statistic computed for the
#' test, the null hypothesis with its parameters tested against
#' and degrees of freedom of the test.
#' @rdname spark.kstest
#' @aliases summary,KSTest-method
#' @export
#' @note summary(KSTest) since 2.1.0
setMethod("summary", signature(object = "KSTest"),
function(object) {
jobj <- object@jobj
pValue <- callJMethod(jobj, "pValue")
statistic <- callJMethod(jobj, "statistic")
nullHypothesis <- callJMethod(jobj, "nullHypothesis")
distName <- callJMethod(jobj, "distName")
distParams <- unlist(callJMethod(jobj, "distParams"))
degreesOfFreedom <- callJMethod(jobj, "degreesOfFreedom")
list(p.value = pValue, statistic = statistic, nullHypothesis = nullHypothesis,
nullHypothesis.name = distName, nullHypothesis.parameters = distParams,
degreesOfFreedom = degreesOfFreedom)
})
# Prints the summary of KSTest
#' @rdname spark.kstest
#' @param x test result object of KSTest by \code{spark.kstest}.
#' @export
#' @note print.summary.KSTest since 2.1.0
print.summary.KSTest <- function(x, ...) {
jobj <- x@jobj
summaryStr <- callJMethod(jobj, "summary")
cat(summaryStr)
invisible(summaryStr)
}
......@@ -742,4 +742,38 @@ test_that("spark.als", {
unlink(modelPath)
})
test_that("spark.kstest", {
data <- data.frame(test = c(0.1, 0.15, 0.2, 0.3, 0.25, -1, -0.5))
df <- createDataFrame(data)
testResult <- spark.kstest(df, "test", "norm")
stats <- summary(testResult)
rStats <- ks.test(data$test, "pnorm", alternative = "two.sided")
expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4)
expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4)
printStr <- print.summary.KSTest(testResult)
expect_match(printStr, paste0("Kolmogorov-Smirnov test summary:\\n",
"degrees of freedom = 0 \\n",
"statistic = 0.38208[0-9]* \\n",
"pValue = 0.19849[0-9]* \\n",
".*"), perl = TRUE)
testResult <- spark.kstest(df, "test", "norm", -0.5)
stats <- summary(testResult)
rStats <- ks.test(data$test, "pnorm", -0.5, 1, alternative = "two.sided")
expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4)
expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4)
printStr <- print.summary.KSTest(testResult)
expect_match(printStr, paste0("Kolmogorov-Smirnov test summary:\\n",
"degrees of freedom = 0 \\n",
"statistic = 0.44003[0-9]* \\n",
"pValue = 0.09470[0-9]* \\n",
".*"), perl = TRUE)
})
sparkR.session.stop()
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml.r
import org.apache.spark.mllib.stat.Statistics.kolmogorovSmirnovTest
import org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult
import org.apache.spark.sql.{DataFrame, Row}
private[r] class KSTestWrapper private (
val testResult: KolmogorovSmirnovTestResult,
val distName: String,
val distParams: Array[Double]) {
lazy val pValue = testResult.pValue
lazy val statistic = testResult.statistic
lazy val nullHypothesis = testResult.nullHypothesis
lazy val degreesOfFreedom = testResult.degreesOfFreedom
def summary: String = testResult.toString
}
private[r] object KSTestWrapper {
def test(
data: DataFrame,
featureName: String,
distName: String,
distParams: Array[Double]): KSTestWrapper = {
val rddData = data.select(featureName).rdd.map {
case Row(feature: Double) => feature
}
val ksTestResult = kolmogorovSmirnovTest(rddData, distName, distParams : _*)
new KSTestWrapper(ksTestResult, distName, distParams)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment