diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 3cf1028fbc72516e6b53f53e8ed61f7932f2a61d..3cf4e807b4cf7aa12149ed72c6bb976a5baa4ce3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -155,7 +155,7 @@ object Statistics { * :: Experimental :: * Conduct Pearson's independence test for every feature against the label across the input RDD. * For each feature, the (feature, label) pairs are converted into a contingency matrix for which - * the chi-squared statistic is computed. + * the chi-squared statistic is computed. All label and feature values must be categorical. * * @param data an `RDD[LabeledPoint]` containing the labeled dataset with categorical features. * Real-valued features will be treated as categorical for each distinct value. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala index 215de95db5113b9eedfaec9ce0d3e5109bd2fd01..0089419c2c5d478fc96b0b463438bf5a13fa6534 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala @@ -20,11 +20,13 @@ package org.apache.spark.mllib.stat.test import breeze.linalg.{DenseMatrix => BDM} import cern.jet.stat.Probability.chiSquareComplemented -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD +import scala.collection.mutable + /** * Conduct the chi-squared test for the input RDDs using the specified method. * Goodness-of-fit test is conducted on two `Vectors`, whereas test of independence is conducted @@ -75,21 +77,42 @@ private[stat] object ChiSqTest extends Logging { */ def chiSquaredFeatures(data: RDD[LabeledPoint], methodName: String = PEARSON.name): Array[ChiSqTestResult] = { + val maxCategories = 10000 val numCols = data.first().features.size val results = new Array[ChiSqTestResult](numCols) var labels: Map[Double, Int] = null - // At most 100 columns at a time - val batchSize = 100 + // at most 1000 columns at a time + val batchSize = 1000 var batch = 0 while (batch * batchSize < numCols) { // The following block of code can be cleaned up and made public as // chiSquared(data: RDD[(V1, V2)]) val startCol = batch * batchSize val endCol = startCol + math.min(batchSize, numCols - startCol) - val pairCounts = data.flatMap { p => - // assume dense vectors - p.features.toArray.slice(startCol, endCol).zipWithIndex.map { case (feature, col) => - (col, feature, p.label) + val pairCounts = data.mapPartitions { iter => + val distinctLabels = mutable.HashSet.empty[Double] + val allDistinctFeatures: Map[Int, mutable.HashSet[Double]] = + Map((startCol until endCol).map(col => (col, mutable.HashSet.empty[Double])): _*) + var i = 1 + iter.flatMap { case LabeledPoint(label, features) => + if (i % 1000 == 0) { + if (distinctLabels.size > maxCategories) { + throw new SparkException(s"Chi-square test expect factors (categorical values) but " + + s"found more than $maxCategories distinct label values.") + } + allDistinctFeatures.foreach { case (col, distinctFeatures) => + if (distinctFeatures.size > maxCategories) { + throw new SparkException(s"Chi-square test expect factors (categorical values) but " + + s"found more than $maxCategories distinct values in column $col.") + } + } + } + i += 1 + distinctLabels += label + features.toArray.view.zipWithIndex.slice(startCol, endCol).map { case (feature, col) => + allDistinctFeatures(col) += feature + (col, feature, label) + } } }.countByValue() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index 5bd0521298c14b2c5c0e51a90debd7abba0b8b9f..6de3840b3f19817d39e3b9f7f5351d1b49c8fabb 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -17,8 +17,11 @@ package org.apache.spark.mllib.stat +import java.util.Random + import org.scalatest.FunSuite +import org.apache.spark.SparkException import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Vectors} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.test.ChiSqTest @@ -107,12 +110,13 @@ class HypothesisTestSuite extends FunSuite with LocalSparkContext { // labels: 1.0 (2 / 6), 0.0 (4 / 6) // feature1: 0.5 (1 / 6), 1.5 (2 / 6), 3.5 (3 / 6) // feature2: 10.0 (1 / 6), 20.0 (1 / 6), 30.0 (2 / 6), 40.0 (2 / 6) - val data = Array(new LabeledPoint(0.0, Vectors.dense(0.5, 10.0)), - new LabeledPoint(0.0, Vectors.dense(1.5, 20.0)), - new LabeledPoint(1.0, Vectors.dense(1.5, 30.0)), - new LabeledPoint(0.0, Vectors.dense(3.5, 30.0)), - new LabeledPoint(0.0, Vectors.dense(3.5, 40.0)), - new LabeledPoint(1.0, Vectors.dense(3.5, 40.0))) + val data = Seq( + LabeledPoint(0.0, Vectors.dense(0.5, 10.0)), + LabeledPoint(0.0, Vectors.dense(1.5, 20.0)), + LabeledPoint(1.0, Vectors.dense(1.5, 30.0)), + LabeledPoint(0.0, Vectors.dense(3.5, 30.0)), + LabeledPoint(0.0, Vectors.dense(3.5, 40.0)), + LabeledPoint(1.0, Vectors.dense(3.5, 40.0))) for (numParts <- List(2, 4, 6, 8)) { val chi = Statistics.chiSqTest(sc.parallelize(data, numParts)) val feature1 = chi(0) @@ -130,10 +134,25 @@ class HypothesisTestSuite extends FunSuite with LocalSparkContext { } // Test that the right number of results is returned - val numCols = 321 - val sparseData = Array(new LabeledPoint(0.0, Vectors.sparse(numCols, Seq((100, 2.0)))), - new LabeledPoint(0.0, Vectors.sparse(numCols, Seq((200, 1.0))))) + val numCols = 1001 + val sparseData = Array( + new LabeledPoint(0.0, Vectors.sparse(numCols, Seq((100, 2.0)))), + new LabeledPoint(0.1, Vectors.sparse(numCols, Seq((200, 1.0))))) val chi = Statistics.chiSqTest(sc.parallelize(sparseData)) assert(chi.size === numCols) + assert(chi(1000) != null) // SPARK-3087 + + // Detect continous features or labels + val random = new Random(11L) + val continuousLabel = + Seq.fill(100000)(LabeledPoint(random.nextDouble(), Vectors.dense(random.nextInt(2)))) + intercept[SparkException] { + Statistics.chiSqTest(sc.parallelize(continuousLabel, 2)) + } + val continuousFeature = + Seq.fill(100000)(LabeledPoint(random.nextInt(2), Vectors.dense(random.nextDouble()))) + intercept[SparkException] { + Statistics.chiSqTest(sc.parallelize(continuousFeature, 2)) + } } }