Skip to content
Snippets Groups Projects
Commit 71ad945b authored by z001qdp's avatar z001qdp Committed by Sean Owen
Browse files

[SPARK-16426][MLLIB] Fix bug that caused NaNs in IsotonicRegression

## What changes were proposed in this pull request?

Fixed a bug that caused `NaN`s in `IsotonicRegression`. The problem occurs when training rows with the same feature value but different labels end up on different partitions. This patch changes a `sortBy` call to a `partitionBy(RangePartitioner)` followed by a `mapPartitions(sortBy)` in order to ensure that all rows with the same feature value end up on the same partition.

## How was this patch tested?

Added a unit test.

Author: z001qdp <Nicholas.Eggert@target.com>

Closes #14140 from neggert/SPARK-16426-isotonic-nan.
parent 18324238
No related branches found
No related tags found
No related merge requests found
......@@ -35,6 +35,7 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.RangePartitioner
/**
* Regression model for isotonic regression.
......@@ -408,9 +409,11 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
*/
private def parallelPoolAdjacentViolators(
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
val parallelStepResult = input
.sortBy(x => (x._2, x._1))
.glom()
val keyedInput = input.keyBy(_._2)
val parallelStepResult = keyedInput
.partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput))
.values
.mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1))))
.flatMap(poolAdjacentViolators)
.collect()
.sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
......
......@@ -176,6 +176,17 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w
assert(model.predictions === Array(1, 2, 2))
}
test("SPARK-16426 isotonic regression with duplicate features that produce NaNs") {
val trainRDD = sc.parallelize(Seq[(Double, Double, Double)]((2, 1, 1), (1, 1, 1), (0, 2, 1),
(1, 2, 1), (0.5, 3, 1), (0, 3, 1)),
2)
val model = new IsotonicRegression().run(trainRDD)
assert(model.boundaries === Array(1.0, 3.0))
assert(model.predictions === Array(0.75, 0.75))
}
test("isotonic regression prediction") {
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment