diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 1cd6f2a8969a6b29c810261557572f10bb6c2646..377326f8739b758f9b196079f48269f3ac3063f7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -35,6 +35,7 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession +import org.apache.spark.RangePartitioner /** * Regression model for isotonic regression. @@ -408,9 +409,11 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali */ private def parallelPoolAdjacentViolators( input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { - val parallelStepResult = input - .sortBy(x => (x._2, x._1)) - .glom() + val keyedInput = input.keyBy(_._2) + val parallelStepResult = keyedInput + .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput)) + .values + .mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1)))) .flatMap(poolAdjacentViolators) .collect() .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering. diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index ea4f2865757c1277574c203b19d009ef3a2d7664..94da626d92ebb5d0d6e23e8cd05240d128931a05 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -176,6 +176,17 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w assert(model.predictions === Array(1, 2, 2)) } + test("SPARK-16426 isotonic regression with duplicate features that produce NaNs") { + val trainRDD = sc.parallelize(Seq[(Double, Double, Double)]((2, 1, 1), (1, 1, 1), (0, 2, 1), + (1, 2, 1), (0.5, 3, 1), (0, 3, 1)), + 2) + + val model = new IsotonicRegression().run(trainRDD) + + assert(model.boundaries === Array(1.0, 3.0)) + assert(model.predictions === Array(0.75, 0.75)) + } + test("isotonic regression prediction") { val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)