From dbb06c689c157502cb081421baecce411832aad8 Mon Sep 17 00:00:00 2001
From: Yanbo Liang <ybliang8@gmail.com>
Date: Wed, 26 Apr 2017 21:34:18 +0800
Subject: [PATCH] [MINOR][ML] Fix some PySpark & SparkR flaky tests
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

## What changes were proposed in this pull request?
Some PySpark & SparkR tests run with tiny dataset and tiny ```maxIter```, which means they are not converged. I don’t think checking intermediate result during iteration make sense, and these intermediate result may vulnerable and not stable, so we should switch to check the converged result. We hit this issue at #17746 when we upgrade breeze to 0.13.1.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #17757 from yanboliang/flaky-test.
---
 .../testthat/test_mllib_classification.R      | 17 +----
 python/pyspark/ml/classification.py           | 71 ++++++++++---------
 2 files changed, 38 insertions(+), 50 deletions(-)

diff --git a/R/pkg/inst/tests/testthat/test_mllib_classification.R b/R/pkg/inst/tests/testthat/test_mllib_classification.R
index af7cbdccf5..cbc7087182 100644
--- a/R/pkg/inst/tests/testthat/test_mllib_classification.R
+++ b/R/pkg/inst/tests/testthat/test_mllib_classification.R
@@ -284,22 +284,11 @@ test_that("spark.mlp", {
                c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
 
   # test initialWeights
-  model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
+  model <- spark.mlp(df, label ~ features, layers = c(4, 3), initialWeights =
     c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
   mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
   expect_equal(head(mlpPredictions$prediction, 10),
-               c("1.0", "1.0", "2.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
-
-  model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
-    c(0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 5.0, 5.0, 5.0, 5.0, 9.0, 9.0, 9.0, 9.0, 9.0))
-  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 10),
-               c("1.0", "1.0", "2.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
-
-  model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2)
-  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 10),
-               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "0.0", "0.0", "1.0", "0.0"))
+               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
 
   # Test formula works well
   df <- suppressWarnings(createDataFrame(iris))
@@ -310,8 +299,6 @@ test_that("spark.mlp", {
   expect_equal(summary$numOfOutputs, 3)
   expect_equal(summary$layers, c(4, 3))
   expect_equal(length(summary$weights), 15)
-  expect_equal(head(summary$weights, 5), list(-0.5793153, -4.652961, 6.216155, -6.649478,
-               -10.51147), tolerance = 1e-3)
 })
 
 test_that("spark.naiveBayes", {
diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py
index 864968390a..a9756ea4af 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -185,34 +185,33 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti
     >>> from pyspark.sql import Row
     >>> from pyspark.ml.linalg import Vectors
     >>> bdf = sc.parallelize([
-    ...     Row(label=1.0, weight=2.0, features=Vectors.dense(1.0)),
-    ...     Row(label=0.0, weight=2.0, features=Vectors.sparse(1, [], []))]).toDF()
-    >>> blor = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight")
+    ...     Row(label=1.0, weight=1.0, features=Vectors.dense(0.0, 5.0)),
+    ...     Row(label=0.0, weight=2.0, features=Vectors.dense(1.0, 2.0)),
+    ...     Row(label=1.0, weight=3.0, features=Vectors.dense(2.0, 1.0)),
+    ...     Row(label=0.0, weight=4.0, features=Vectors.dense(3.0, 3.0))]).toDF()
+    >>> blor = LogisticRegression(regParam=0.01, weightCol="weight")
     >>> blorModel = blor.fit(bdf)
     >>> blorModel.coefficients
-    DenseVector([5.4...])
+    DenseVector([-1.080..., -0.646...])
     >>> blorModel.intercept
-    -2.63...
-    >>> mdf = sc.parallelize([
-    ...     Row(label=1.0, weight=2.0, features=Vectors.dense(1.0)),
-    ...     Row(label=0.0, weight=2.0, features=Vectors.sparse(1, [], [])),
-    ...     Row(label=2.0, weight=2.0, features=Vectors.dense(3.0))]).toDF()
-    >>> mlor = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight",
-    ...     family="multinomial")
+    3.112...
+    >>> data_path = "data/mllib/sample_multiclass_classification_data.txt"
+    >>> mdf = spark.read.format("libsvm").load(data_path)
+    >>> mlor = LogisticRegression(regParam=0.1, elasticNetParam=1.0, family="multinomial")
     >>> mlorModel = mlor.fit(mdf)
     >>> mlorModel.coefficientMatrix
-    DenseMatrix(3, 1, [-2.3..., 0.2..., 2.1...], 1)
+    SparseMatrix(3, 4, [0, 1, 2, 3], [3, 2, 1], [1.87..., -2.75..., -0.50...], 1)
     >>> mlorModel.interceptVector
-    DenseVector([2.1..., 0.6..., -2.8...])
-    >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF()
+    DenseVector([0.04..., -0.42..., 0.37...])
+    >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 1.0))]).toDF()
     >>> result = blorModel.transform(test0).head()
     >>> result.prediction
-    0.0
+    1.0
     >>> result.probability
-    DenseVector([0.99..., 0.00...])
+    DenseVector([0.02..., 0.97...])
     >>> result.rawPrediction
-    DenseVector([8.12..., -8.12...])
-    >>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]).toDF()
+    DenseVector([-3.54..., 3.54...])
+    >>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF()
     >>> blorModel.transform(test1).head().prediction
     1.0
     >>> blor.setParams("vector")
@@ -222,8 +221,8 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti
     >>> lr_path = temp_path + "/lr"
     >>> blor.save(lr_path)
     >>> lr2 = LogisticRegression.load(lr_path)
-    >>> lr2.getMaxIter()
-    5
+    >>> lr2.getRegParam()
+    0.01
     >>> model_path = temp_path + "/lr_model"
     >>> blorModel.save(model_path)
     >>> model2 = LogisticRegressionModel.load(model_path)
@@ -1480,31 +1479,33 @@ class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable):
 
     >>> from pyspark.sql import Row
     >>> from pyspark.ml.linalg import Vectors
-    >>> df = sc.parallelize([
-    ...     Row(label=0.0, features=Vectors.dense(1.0, 0.8)),
-    ...     Row(label=1.0, features=Vectors.sparse(2, [], [])),
-    ...     Row(label=2.0, features=Vectors.dense(0.5, 0.5))]).toDF()
-    >>> lr = LogisticRegression(maxIter=5, regParam=0.01)
+    >>> data_path = "data/mllib/sample_multiclass_classification_data.txt"
+    >>> df = spark.read.format("libsvm").load(data_path)
+    >>> lr = LogisticRegression(regParam=0.01)
     >>> ovr = OneVsRest(classifier=lr)
     >>> model = ovr.fit(df)
-    >>> [x.coefficients for x in model.models]
-    [DenseVector([4.9791, 2.426]), DenseVector([-4.1198, -5.9326]), DenseVector([-3.314, 5.2423])]
+    >>> model.models[0].coefficients
+    DenseVector([0.5..., -1.0..., 3.4..., 4.2...])
+    >>> model.models[1].coefficients
+    DenseVector([-2.1..., 3.1..., -2.6..., -2.3...])
+    >>> model.models[2].coefficients
+    DenseVector([0.3..., -3.4..., 1.0..., -1.1...])
     >>> [x.intercept for x in model.models]
-    [-5.06544..., 2.30341..., -1.29133...]
-    >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0))]).toDF()
+    [-2.7..., -2.5..., -1.3...]
+    >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0, 0.0, 1.0, 1.0))]).toDF()
     >>> model.transform(test0).head().prediction
-    1.0
-    >>> test1 = sc.parallelize([Row(features=Vectors.sparse(2, [0], [1.0]))]).toDF()
-    >>> model.transform(test1).head().prediction
     0.0
-    >>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4))]).toDF()
-    >>> model.transform(test2).head().prediction
+    >>> test1 = sc.parallelize([Row(features=Vectors.sparse(4, [0], [1.0]))]).toDF()
+    >>> model.transform(test1).head().prediction
     2.0
+    >>> test2 = sc.parallelize([Row(features=Vectors.dense(0.5, 0.4, 0.3, 0.2))]).toDF()
+    >>> model.transform(test2).head().prediction
+    0.0
     >>> model_path = temp_path + "/ovr_model"
     >>> model.save(model_path)
     >>> model2 = OneVsRestModel.load(model_path)
     >>> model2.transform(test0).head().prediction
-    1.0
+    0.0
 
     .. versionadded:: 2.0.0
     """
-- 
GitLab