From c0ba284300e494354f5bb205a10a12ac7daa2b5e Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" <wm624@hotmail.com> Date: Thu, 26 Jan 2017 21:01:59 -0800 Subject: [PATCH] [SPARK-18821][SPARKR] Bisecting k-means wrapper in SparkR ## What changes were proposed in this pull request? Add R wrapper for bisecting Kmeans. As JIRA is down, I will update title to link with corresponding JIRA later. ## How was this patch tested? Add new unit tests. Author: wm624@hotmail.com <wm624@hotmail.com> Closes #16566 from wangmiao1981/bk. --- R/pkg/NAMESPACE | 3 +- R/pkg/R/generics.R | 5 + R/pkg/R/mllib_clustering.R | 149 ++++++++++++++++++ R/pkg/R/mllib_utils.R | 10 +- .../tests/testthat/test_mllib_clustering.R | 40 +++++ .../spark/ml/r/BisectingKMeansWrapper.scala | 143 +++++++++++++++++ .../org/apache/spark/ml/r/RWrappers.scala | 2 + 7 files changed, 347 insertions(+), 5 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 0cd9cb89d5..caa1c3b91b 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -47,7 +47,8 @@ exportMethods("glm", "spark.kstest", "spark.logit", "spark.randomForest", - "spark.gbt") + "spark.gbt", + "spark.bisectingKmeans") # Job group lifecycle management methods export("setJobGroup", diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 499c7b279e..433c16640c 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1338,6 +1338,11 @@ setGeneric("rbind", signature = "...") #' @export setGeneric("spark.als", function(data, ...) { standardGeneric("spark.als") }) +#' @rdname spark.bisectingKmeans +#' @export +setGeneric("spark.bisectingKmeans", + function(data, formula, ...) { standardGeneric("spark.bisectingKmeans") }) + #' @rdname spark.gaussianMixture #' @export setGeneric("spark.gaussianMixture", diff --git a/R/pkg/R/mllib_clustering.R b/R/pkg/R/mllib_clustering.R index fa40f9d0bf..05bbab680d 100644 --- a/R/pkg/R/mllib_clustering.R +++ b/R/pkg/R/mllib_clustering.R @@ -17,6 +17,13 @@ # mllib_clustering.R: Provides methods for MLlib clustering algorithms integration +#' S4 class that represents a BisectingKMeansModel +#' +#' @param jobj a Java object reference to the backing Scala BisectingKMeansModel +#' @export +#' @note BisectingKMeansModel since 2.2.0 +setClass("BisectingKMeansModel", representation(jobj = "jobj")) + #' S4 class that represents a GaussianMixtureModel #' #' @param jobj a Java object reference to the backing Scala GaussianMixtureModel @@ -38,6 +45,148 @@ setClass("KMeansModel", representation(jobj = "jobj")) #' @note LDAModel since 2.1.0 setClass("LDAModel", representation(jobj = "jobj")) +#' Bisecting K-Means Clustering Model +#' +#' Fits a bisecting k-means clustering model against a Spark DataFrame. +#' Users can call \code{summary} to print a summary of the fitted model, \code{predict} to make +#' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models. +#' +#' @param data a SparkDataFrame for training. +#' @param formula a symbolic description of the model to be fitted. Currently only a few formula +#' operators are supported, including '~', '.', ':', '+', and '-'. +#' Note that the response variable of formula is empty in spark.bisectingKmeans. +#' @param k the desired number of leaf clusters. Must be > 1. +#' The actual number could be smaller if there are no divisible leaf clusters. +#' @param maxIter maximum iteration number. +#' @param seed the random seed. +#' @param minDivisibleClusterSize The minimum number of points (if greater than or equal to 1.0) +#' or the minimum proportion of points (if less than 1.0) of a divisible cluster. +#' Note that it is an expert parameter. The default value should be good enough +#' for most cases. +#' @param ... additional argument(s) passed to the method. +#' @return \code{spark.bisectingKmeans} returns a fitted bisecting k-means model. +#' @rdname spark.bisectingKmeans +#' @aliases spark.bisectingKmeans,SparkDataFrame,formula-method +#' @name spark.bisectingKmeans +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' df <- createDataFrame(iris) +#' model <- spark.bisectingKmeans(df, Sepal_Length ~ Sepal_Width, k = 4) +#' summary(model) +#' +#' # get fitted result from a bisecting k-means model +#' fitted.model <- fitted(model, "centers") +#' showDF(fitted.model) +#' +#' # fitted values on training data +#' fitted <- predict(model, df) +#' head(select(fitted, "Sepal_Length", "prediction")) +#' +#' # save fitted model to input path +#' path <- "path/to/model" +#' write.ml(model, path) +#' +#' # can also read back the saved model and print +#' savedModel <- read.ml(path) +#' summary(savedModel) +#' } +#' @note spark.bisectingKmeans since 2.2.0 +#' @seealso \link{predict}, \link{read.ml}, \link{write.ml} +setMethod("spark.bisectingKmeans", signature(data = "SparkDataFrame", formula = "formula"), + function(data, formula, k = 4, maxIter = 20, seed = NULL, minDivisibleClusterSize = 1.0) { + formula <- paste0(deparse(formula), collapse = "") + if (!is.null(seed)) { + seed <- as.character(as.integer(seed)) + } + jobj <- callJStatic("org.apache.spark.ml.r.BisectingKMeansWrapper", "fit", + data@sdf, formula, as.integer(k), as.integer(maxIter), + seed, as.numeric(minDivisibleClusterSize)) + new("BisectingKMeansModel", jobj = jobj) + }) + +# Get the summary of a bisecting k-means model + +#' @param object a fitted bisecting k-means model. +#' @return \code{summary} returns summary information of the fitted model, which is a list. +#' The list includes the model's \code{k} (number of cluster centers), +#' \code{coefficients} (model cluster centers), +#' \code{size} (number of data points in each cluster), \code{cluster} +#' (cluster centers of the transformed data; cluster is NULL if is.loaded is TRUE), +#' and \code{is.loaded} (whether the model is loaded from a saved file). +#' @rdname spark.bisectingKmeans +#' @export +#' @note summary(BisectingKMeansModel) since 2.2.0 +setMethod("summary", signature(object = "BisectingKMeansModel"), + function(object) { + jobj <- object@jobj + is.loaded <- callJMethod(jobj, "isLoaded") + features <- callJMethod(jobj, "features") + coefficients <- callJMethod(jobj, "coefficients") + k <- callJMethod(jobj, "k") + size <- callJMethod(jobj, "size") + coefficients <- t(matrix(coefficients, ncol = k)) + colnames(coefficients) <- unlist(features) + rownames(coefficients) <- 1:k + cluster <- if (is.loaded) { + NULL + } else { + dataFrame(callJMethod(jobj, "cluster")) + } + list(k = k, coefficients = coefficients, size = size, + cluster = cluster, is.loaded = is.loaded) + }) + +# Predicted values based on a bisecting k-means model + +#' @param newData a SparkDataFrame for testing. +#' @return \code{predict} returns the predicted values based on a bisecting k-means model. +#' @rdname spark.bisectingKmeans +#' @export +#' @note predict(BisectingKMeansModel) since 2.2.0 +setMethod("predict", signature(object = "BisectingKMeansModel"), + function(object, newData) { + predict_internal(object, newData) + }) + +#' Get fitted result from a bisecting k-means model +#' +#' Get fitted result from a bisecting k-means model. +#' Note: A saved-loaded model does not support this method. +#' +#' @param method type of fitted results, \code{"centers"} for cluster centers +#' or \code{"classes"} for assigned classes. +#' @return \code{fitted} returns a SparkDataFrame containing fitted values. +#' @rdname spark.bisectingKmeans +#' @export +#' @note fitted since 2.2.0 +setMethod("fitted", signature(object = "BisectingKMeansModel"), + function(object, method = c("centers", "classes")) { + method <- match.arg(method) + jobj <- object@jobj + is.loaded <- callJMethod(jobj, "isLoaded") + if (is.loaded) { + stop("Saved-loaded bisecting k-means model does not support 'fitted' method") + } else { + dataFrame(callJMethod(jobj, "fitted", method)) + } + }) + +# Save fitted MLlib model to the input path + +#' @param path the directory where the model is saved. +#' @param overwrite overwrites or not if the output path already exists. Default is FALSE +#' which means throw exception if the output path exists. +#' +#' @rdname spark.bisectingKmeans +#' @export +#' @note write.ml(BisectingKMeansModel, character) since 2.2.0 +setMethod("write.ml", signature(object = "BisectingKMeansModel", path = "character"), + function(object, path, overwrite = FALSE) { + write_internal(object, path, overwrite) + }) + #' Multivariate Gaussian Mixture Model (GMM) #' #' Fits multivariate gaussian mixture model against a Spark DataFrame, similarly to R's diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R index 720ee41c58..29c4473923 100644 --- a/R/pkg/R/mllib_utils.R +++ b/R/pkg/R/mllib_utils.R @@ -32,8 +32,8 @@ #' @rdname write.ml #' @name write.ml #' @export -#' @seealso \link{spark.glm}, \link{glm}, -#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg}, +#' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture}, +#' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg}, #' @seealso \link{spark.kmeans}, #' @seealso \link{spark.lda}, \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes}, #' @seealso \link{spark.randomForest}, \link{spark.survreg}, @@ -47,8 +47,8 @@ NULL #' @rdname predict #' @name predict #' @export -#' @seealso \link{spark.glm}, \link{glm}, -#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg}, +#' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture}, +#' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg}, #' @seealso \link{spark.kmeans}, #' @seealso \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes}, #' @seealso \link{spark.randomForest}, \link{spark.survreg} @@ -113,6 +113,8 @@ read.ml <- function(path) { new("GBTRegressionModel", jobj = jobj) } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GBTClassifierWrapper")) { new("GBTClassificationModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.BisectingKMeansWrapper")) { + new("BisectingKMeansModel", jobj = jobj) } else { stop("Unsupported model: ", jobj) } diff --git a/R/pkg/inst/tests/testthat/test_mllib_clustering.R b/R/pkg/inst/tests/testthat/test_mllib_clustering.R index 9de8362cde..aad834bb64 100644 --- a/R/pkg/inst/tests/testthat/test_mllib_clustering.R +++ b/R/pkg/inst/tests/testthat/test_mllib_clustering.R @@ -27,6 +27,46 @@ absoluteSparkPath <- function(x) { file.path(sparkHome, x) } +test_that("spark.bisectingKmeans", { + newIris <- iris + newIris$Species <- NULL + training <- suppressWarnings(createDataFrame(newIris)) + + take(training, 1) + + model <- spark.bisectingKmeans(data = training, ~ .) + sample <- take(select(predict(model, training), "prediction"), 1) + expect_equal(typeof(sample$prediction), "integer") + expect_equal(sample$prediction, 1) + + # Test fitted works on Bisecting KMeans + fitted.model <- fitted(model) + expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), + c(0, 1, 2, 3)) + + # Test summary works on KMeans + summary.model <- summary(model) + cluster <- summary.model$cluster + k <- summary.model$k + expect_equal(k, 4) + expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), + c(0, 1, 2, 3)) + + # Test model save/load + modelPath <- tempfile(pattern = "spark-bisectingkmeans", fileext = ".tmp") + write.ml(model, modelPath) + expect_error(write.ml(model, modelPath)) + write.ml(model, modelPath, overwrite = TRUE) + model2 <- read.ml(modelPath) + summary2 <- summary(model2) + expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size))) + expect_equal(summary.model$coefficients, summary2$coefficients) + expect_true(!summary.model$is.loaded) + expect_true(summary2$is.loaded) + + unlink(modelPath) +}) + test_that("spark.gaussianMixture", { # R code to reproduce the result. # nolint start diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala new file mode 100644 index 0000000000..71712c1c5e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/r/BisectingKMeansWrapper.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.r + +import org.apache.hadoop.fs.Path +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.ml.clustering.{BisectingKMeans, BisectingKMeansModel} +import org.apache.spark.ml.feature.RFormula +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{DataFrame, Dataset} + +private[r] class BisectingKMeansWrapper private ( + val pipeline: PipelineModel, + val features: Array[String], + val size: Array[Long], + val isLoaded: Boolean = false) extends MLWritable { + private val bisectingKmeansModel: BisectingKMeansModel = + pipeline.stages.last.asInstanceOf[BisectingKMeansModel] + + lazy val coefficients: Array[Double] = bisectingKmeansModel.clusterCenters.flatMap(_.toArray) + + lazy val k: Int = bisectingKmeansModel.getK + + // If the model is loaded from a saved model, cluster is NULL. It is checked on R side + lazy val cluster: DataFrame = bisectingKmeansModel.summary.cluster + + def fitted(method: String): DataFrame = { + if (method == "centers") { + bisectingKmeansModel.summary.predictions.drop(bisectingKmeansModel.getFeaturesCol) + } else if (method == "classes") { + bisectingKmeansModel.summary.cluster + } else { + throw new UnsupportedOperationException( + s"Method (centers or classes) required but $method found.") + } + } + + def transform(dataset: Dataset[_]): DataFrame = { + pipeline.transform(dataset).drop(bisectingKmeansModel.getFeaturesCol) + } + + override def write: MLWriter = new BisectingKMeansWrapper.BisectingKMeansWrapperWriter(this) +} + +private[r] object BisectingKMeansWrapper extends MLReadable[BisectingKMeansWrapper] { + + def fit( + data: DataFrame, + formula: String, + k: Int, + maxIter: Int, + seed: String, + minDivisibleClusterSize: Double + ): BisectingKMeansWrapper = { + + val rFormula = new RFormula() + .setFormula(formula) + .setFeaturesCol("features") + RWrapperUtils.checkDataColumns(rFormula, data) + val rFormulaModel = rFormula.fit(data) + + // get feature names from output schema + val schema = rFormulaModel.transform(data).schema + val featureAttrs = AttributeGroup.fromStructField(schema(rFormulaModel.getFeaturesCol)) + .attributes.get + val features = featureAttrs.map(_.name.get) + + val bisectingKmeans = new BisectingKMeans() + .setK(k) + .setMaxIter(maxIter) + .setMinDivisibleClusterSize(minDivisibleClusterSize) + .setFeaturesCol(rFormula.getFeaturesCol) + + if (seed != null && seed.length > 0) bisectingKmeans.setSeed(seed.toInt) + + val pipeline = new Pipeline() + .setStages(Array(rFormulaModel, bisectingKmeans)) + .fit(data) + + val bisectingKmeansModel: BisectingKMeansModel = + pipeline.stages.last.asInstanceOf[BisectingKMeansModel] + val size: Array[Long] = bisectingKmeansModel.summary.clusterSizes + + new BisectingKMeansWrapper(pipeline, features, size) + } + + override def read: MLReader[BisectingKMeansWrapper] = new BisectingKMeansWrapperReader + + override def load(path: String): BisectingKMeansWrapper = super.load(path) + + class BisectingKMeansWrapperWriter(instance: BisectingKMeansWrapper) extends MLWriter { + + override protected def saveImpl(path: String): Unit = { + val rMetadataPath = new Path(path, "rMetadata").toString + val pipelinePath = new Path(path, "pipeline").toString + + val rMetadata = ("class" -> instance.getClass.getName) ~ + ("features" -> instance.features.toSeq) ~ + ("size" -> instance.size.toSeq) + val rMetadataJson: String = compact(render(rMetadata)) + + sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + instance.pipeline.save(pipelinePath) + } + } + + class BisectingKMeansWrapperReader extends MLReader[BisectingKMeansWrapper] { + + override def load(path: String): BisectingKMeansWrapper = { + implicit val format = DefaultFormats + val rMetadataPath = new Path(path, "rMetadata").toString + val pipelinePath = new Path(path, "pipeline").toString + val pipeline = PipelineModel.load(pipelinePath) + + val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadata = parse(rMetadataStr) + val features = (rMetadata \ "features").extract[Array[String]] + val size = (rMetadata \ "size").extract[Array[Long]] + new BisectingKMeansWrapper(pipeline, features, size, isLoaded = true) + } + } + +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala index b59fe29234..c44179281b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala @@ -64,6 +64,8 @@ private[r] object RWrappers extends MLReader[Object] { GBTRegressorWrapper.load(path) case "org.apache.spark.ml.r.GBTClassifierWrapper" => GBTClassifierWrapper.load(path) + case "org.apache.spark.ml.r.BisectingKMeansWrapper" => + BisectingKMeansWrapper.load(path) case _ => throw new SparkException(s"SparkR read.ml does not support load $className") } -- GitLab