diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 7653ca7bccec9c7b430a6237331c1d1ad427542c..499c7b279ea9d8c6e6c2a3d36d9561150cb9f45a 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1373,7 +1373,7 @@ setGeneric("spark.logit", function(data, formula, ...) { standardGeneric("spark. #' @rdname spark.mlp #' @export -setGeneric("spark.mlp", function(data, ...) { standardGeneric("spark.mlp") }) +setGeneric("spark.mlp", function(data, formula, ...) { standardGeneric("spark.mlp") }) #' @rdname spark.naiveBayes #' @export diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 1065b4b37d7f3f34f4d9a5c7e9ab4f88b8ecbcac..265e64e7466faca4c37917fa7dd3c05c03850e69 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -525,7 +525,7 @@ setMethod("write.ml", signature(object = "LDAModel", path = "character"), #' @note spark.isoreg since 2.1.0 setMethod("spark.isoreg", signature(data = "SparkDataFrame", formula = "formula"), function(data, formula, isotonic = TRUE, featureIndex = 0, weightCol = NULL) { - formula <- paste0(deparse(formula), collapse = "") + formula <- paste(deparse(formula), collapse = "") if (is.null(weightCol)) { weightCol <- "" @@ -775,7 +775,7 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula") tol = 1E-6, fitIntercept = TRUE, family = "auto", standardization = TRUE, thresholds = 0.5, weightCol = NULL, aggregationDepth = 2, probabilityCol = "probability") { - formula <- paste0(deparse(formula), collapse = "") + formula <- paste(deparse(formula), collapse = "") if (is.null(weightCol)) { weightCol <- "" @@ -858,6 +858,8 @@ setMethod("summary", signature(object = "LogisticRegressionModel"), #' Multilayer Perceptron} #' #' @param data a \code{SparkDataFrame} of observations and labels for model fitting. +#' @param formula a symbolic description of the model to be fitted. Currently only a few formula +#' operators are supported, including '~', '.', ':', '+', and '-'. #' @param blockSize blockSize parameter. #' @param layers integer vector containing the number of nodes for each layer #' @param solver solver parameter, supported options: "gd" (minibatch gradient descent) or "l-bfgs". @@ -870,7 +872,7 @@ setMethod("summary", signature(object = "LogisticRegressionModel"), #' @param ... additional arguments passed to the method. #' @return \code{spark.mlp} returns a fitted Multilayer Perceptron Classification Model. #' @rdname spark.mlp -#' @aliases spark.mlp,SparkDataFrame-method +#' @aliases spark.mlp,SparkDataFrame,formula-method #' @name spark.mlp #' @seealso \link{read.ml} #' @export @@ -879,7 +881,7 @@ setMethod("summary", signature(object = "LogisticRegressionModel"), #' df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm") #' #' # fit a Multilayer Perceptron Classification Model -#' model <- spark.mlp(df, blockSize = 128, layers = c(4, 3), solver = "l-bfgs", +#' model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 3), solver = "l-bfgs", #' maxIter = 100, tol = 0.5, stepSize = 1, seed = 1, #' initialWeights = c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9)) #' @@ -896,9 +898,10 @@ setMethod("summary", signature(object = "LogisticRegressionModel"), #' summary(savedModel) #' } #' @note spark.mlp since 2.1.0 -setMethod("spark.mlp", signature(data = "SparkDataFrame"), - function(data, layers, blockSize = 128, solver = "l-bfgs", maxIter = 100, +setMethod("spark.mlp", signature(data = "SparkDataFrame", formula = "formula"), + function(data, formula, layers, blockSize = 128, solver = "l-bfgs", maxIter = 100, tol = 1E-6, stepSize = 0.03, seed = NULL, initialWeights = NULL) { + formula <- paste(deparse(formula), collapse = "") if (is.null(layers)) { stop ("layers must be a integer vector with length > 1.") } @@ -913,7 +916,7 @@ setMethod("spark.mlp", signature(data = "SparkDataFrame"), initialWeights <- as.array(as.numeric(na.omit(initialWeights))) } jobj <- callJStatic("org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper", - "fit", data@sdf, as.integer(blockSize), as.array(layers), + "fit", data@sdf, formula, as.integer(blockSize), as.array(layers), as.character(solver), as.integer(maxIter), as.numeric(tol), as.numeric(stepSize), seed, initialWeights) new("MultilayerPerceptronClassificationModel", jobj = jobj) @@ -936,9 +939,10 @@ setMethod("predict", signature(object = "MultilayerPerceptronClassificationModel # Returns the summary of a Multilayer Perceptron Classification Model produced by \code{spark.mlp} #' @param object a Multilayer Perceptron Classification Model fitted by \code{spark.mlp} -#' @return \code{summary} returns a list containing \code{labelCount}, \code{layers}, and -#' \code{weights}. For \code{weights}, it is a numeric vector with length equal to -#' the expected given the architecture (i.e., for 8-10-2 network, 100 connection weights). +#' @return \code{summary} returns a list containing \code{numOfInputs}, \code{numOfOutputs}, +#' \code{layers}, and \code{weights}. For \code{weights}, it is a numeric vector with +#' length equal to the expected given the architecture (i.e., for 8-10-2 network, +#' 112 connection weights). #' @rdname spark.mlp #' @export #' @aliases summary,MultilayerPerceptronClassificationModel-method @@ -946,10 +950,12 @@ setMethod("predict", signature(object = "MultilayerPerceptronClassificationModel setMethod("summary", signature(object = "MultilayerPerceptronClassificationModel"), function(object) { jobj <- object@jobj - labelCount <- callJMethod(jobj, "labelCount") layers <- unlist(callJMethod(jobj, "layers")) + numOfInputs <- head(layers, n = 1) + numOfOutputs <- tail(layers, n = 1) weights <- callJMethod(jobj, "weights") - list(labelCount = labelCount, layers = layers, weights = weights) + list(numOfInputs = numOfInputs, numOfOutputs = numOfOutputs, + layers = layers, weights = weights) }) #' Naive Bayes Models diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 07df4b6d6f844fdac069aac67e9e5f81df5457ed..2a97a51cfa20581d76d82e346a8dc0b85ebc77ee 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -371,12 +371,13 @@ test_that("spark.kmeans", { test_that("spark.mlp", { df <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"), source = "libsvm") - model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = "l-bfgs", maxIter = 100, - tol = 0.5, stepSize = 1, seed = 1) + model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 5, 4, 3), + solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1) # Test summary method summary <- summary(model) - expect_equal(summary$labelCount, 3) + expect_equal(summary$numOfInputs, 4) + expect_equal(summary$numOfOutputs, 3) expect_equal(summary$layers, c(4, 5, 4, 3)) expect_equal(length(summary$weights), 64) expect_equal(head(summary$weights, 5), list(-0.878743, 0.2154151, -1.16304, -0.6583214, 1.009825), @@ -385,7 +386,7 @@ test_that("spark.mlp", { # Test predict method mlpTestDF <- df mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 6), c(0, 1, 1, 1, 1, 1)) + expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0")) # Test model save/load modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp") @@ -395,46 +396,68 @@ test_that("spark.mlp", { model2 <- read.ml(modelPath) summary2 <- summary(model2) - expect_equal(summary2$labelCount, 3) + expect_equal(summary2$numOfInputs, 4) + expect_equal(summary2$numOfOutputs, 3) expect_equal(summary2$layers, c(4, 5, 4, 3)) expect_equal(length(summary2$weights), 64) unlink(modelPath) # Test default parameter - model <- spark.mlp(df, layers = c(4, 5, 4, 3)) + model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3)) mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), c(1, 1, 1, 1, 0, 1, 2, 2, 1, 0)) + expect_equal(head(mlpPredictions$prediction, 10), + c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0")) # Test illegal parameter - expect_error(spark.mlp(df, layers = NULL), "layers must be a integer vector with length > 1.") - expect_error(spark.mlp(df, layers = c()), "layers must be a integer vector with length > 1.") - expect_error(spark.mlp(df, layers = c(3)), "layers must be a integer vector with length > 1.") + expect_error(spark.mlp(df, label ~ features, layers = NULL), + "layers must be a integer vector with length > 1.") + expect_error(spark.mlp(df, label ~ features, layers = c()), + "layers must be a integer vector with length > 1.") + expect_error(spark.mlp(df, label ~ features, layers = c(3)), + "layers must be a integer vector with length > 1.") # Test random seed # default seed - model <- spark.mlp(df, layers = c(4, 5, 4, 3), maxIter = 10) + model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10) mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 2, 2, 1, 2, 0, 1)) + expect_equal(head(mlpPredictions$prediction, 10), + c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0")) # seed equals 10 - model <- spark.mlp(df, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10) + model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10) mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 2, 1, 2, 2, 1, 0, 0, 1)) + expect_equal(head(mlpPredictions$prediction, 10), + c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0")) # test initialWeights - model <- spark.mlp(df, layers = c(4, 3), maxIter = 2, initialWeights = + model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights = c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9)) mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1)) + expect_equal(head(mlpPredictions$prediction, 10), + c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0")) - model <- spark.mlp(df, layers = c(4, 3), maxIter = 2, initialWeights = + model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights = c(0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 5.0, 5.0, 5.0, 5.0, 9.0, 9.0, 9.0, 9.0, 9.0)) mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1)) + expect_equal(head(mlpPredictions$prediction, 10), + c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0")) - model <- spark.mlp(df, layers = c(4, 3), maxIter = 2) + model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2) mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 0, 2, 1, 0, 0, 1)) + expect_equal(head(mlpPredictions$prediction, 10), + c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "0.0", "2.0", "1.0", "0.0")) + + # Test formula works well + df <- suppressWarnings(createDataFrame(iris)) + model <- spark.mlp(df, Species ~ Sepal_Length + Sepal_Width + Petal_Length + Petal_Width, + layers = c(4, 3)) + summary <- summary(model) + expect_equal(summary$numOfInputs, 4) + expect_equal(summary$numOfOutputs, 3) + expect_equal(summary$layers, c(4, 3)) + expect_equal(length(summary$weights), 15) + expect_equal(head(summary$weights, 5), list(-1.1957257, -5.2693685, 7.4489734, -6.3751413, + -10.2376130), tolerance = 1e-6) }) test_that("spark.naiveBayes", { diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala index 2193eb80e9fdd82785c45d9861a0638401fc5e0c..d34de30931143e6de2443c59f3b0eb13cf430412 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala @@ -24,19 +24,29 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.{MultilayerPerceptronClassificationModel, MultilayerPerceptronClassifier} +import org.apache.spark.ml.feature.{IndexToString, RFormula} import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.r.RWrapperUtils._ import org.apache.spark.ml.util.{MLReadable, MLReader, MLWritable, MLWriter} import org.apache.spark.sql.{DataFrame, Dataset} private[r] class MultilayerPerceptronClassifierWrapper private ( - val pipeline: PipelineModel, - val labelCount: Long, - val layers: Array[Int], - val weights: Array[Double] + val pipeline: PipelineModel ) extends MLWritable { + import MultilayerPerceptronClassifierWrapper._ + + val mlpModel: MultilayerPerceptronClassificationModel = + pipeline.stages(1).asInstanceOf[MultilayerPerceptronClassificationModel] + + val weights: Array[Double] = mlpModel.weights.toArray + val layers: Array[Int] = mlpModel.layers + def transform(dataset: Dataset[_]): DataFrame = { pipeline.transform(dataset) + .drop(mlpModel.getFeaturesCol) + .drop(mlpModel.getLabelCol) + .drop(PREDICTED_LABEL_INDEX_COL) } /** @@ -49,10 +59,12 @@ private[r] class MultilayerPerceptronClassifierWrapper private ( private[r] object MultilayerPerceptronClassifierWrapper extends MLReadable[MultilayerPerceptronClassifierWrapper] { + val PREDICTED_LABEL_INDEX_COL = "pred_label_idx" val PREDICTED_LABEL_COL = "prediction" def fit( data: DataFrame, + formula: String, blockSize: Int, layers: Array[Int], solver: String, @@ -62,8 +74,13 @@ private[r] object MultilayerPerceptronClassifierWrapper seed: String, initialWeights: Array[Double] ): MultilayerPerceptronClassifierWrapper = { + val rFormula = new RFormula() + .setFormula(formula) + .setForceIndexLabel(true) + checkDataColumns(rFormula, data) + val rFormulaModel = rFormula.fit(data) // get labels and feature names from output schema - val schema = data.schema + val (_, labels) = getFeaturesAndLabels(rFormulaModel, data) // assemble and fit the pipeline val mlp = new MultilayerPerceptronClassifier() @@ -73,25 +90,25 @@ private[r] object MultilayerPerceptronClassifierWrapper .setMaxIter(maxIter) .setTol(tol) .setStepSize(stepSize) - .setPredictionCol(PREDICTED_LABEL_COL) + .setFeaturesCol(rFormula.getFeaturesCol) + .setLabelCol(rFormula.getLabelCol) + .setPredictionCol(PREDICTED_LABEL_INDEX_COL) if (seed != null && seed.length > 0) mlp.setSeed(seed.toInt) if (initialWeights != null) { require(initialWeights.length > 0) mlp.setInitialWeights(Vectors.dense(initialWeights)) } + val idxToStr = new IndexToString() + .setInputCol(PREDICTED_LABEL_INDEX_COL) + .setOutputCol(PREDICTED_LABEL_COL) + .setLabels(labels) + val pipeline = new Pipeline() - .setStages(Array(mlp)) + .setStages(Array(rFormulaModel, mlp, idxToStr)) .fit(data) - val multilayerPerceptronClassificationModel: MultilayerPerceptronClassificationModel = - pipeline.stages.head.asInstanceOf[MultilayerPerceptronClassificationModel] - - val weights = multilayerPerceptronClassificationModel.weights.toArray - val layersFromPipeline = multilayerPerceptronClassificationModel.layers - val labelCount = data.select("label").distinct().count() - - new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layersFromPipeline, weights) + new MultilayerPerceptronClassifierWrapper(pipeline) } /** @@ -107,17 +124,10 @@ private[r] object MultilayerPerceptronClassifierWrapper override def load(path: String): MultilayerPerceptronClassifierWrapper = { implicit val format = DefaultFormats - val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadataStr = sc.textFile(rMetadataPath, 1).first() - val rMetadata = parse(rMetadataStr) - val labelCount = (rMetadata \ "labelCount").extract[Long] - val layers = (rMetadata \ "layers").extract[Array[Int]] - val weights = (rMetadata \ "weights").extract[Array[Double]] - val pipeline = PipelineModel.load(pipelinePath) - new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layers, weights) + new MultilayerPerceptronClassifierWrapper(pipeline) } } @@ -128,10 +138,7 @@ private[r] object MultilayerPerceptronClassifierWrapper val rMetadataPath = new Path(path, "rMetadata").toString val pipelinePath = new Path(path, "pipeline").toString - val rMetadata = ("class" -> instance.getClass.getName) ~ - ("labelCount" -> instance.labelCount) ~ - ("layers" -> instance.layers.toSeq) ~ - ("weights" -> instance.weights.toArray.toSeq) + val rMetadata = "class" -> instance.getClass.getName val rMetadataJson: String = compact(render(rMetadata)) sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)