Skip to content
Snippets Groups Projects
Commit c8aea744 authored by z001qdp's avatar z001qdp Committed by Joseph K. Bradley
Browse files

[SPARK-17455][MLLIB] Improve PAVA implementation in IsotonicRegression

## What changes were proposed in this pull request?

New implementation of the Pool Adjacent Violators Algorithm (PAVA) in mllib.IsotonicRegression, which used under the hood by ml.regression.IsotonicRegression. The previous implementation could have factorial complexity in the worst case. This implementation, which closely follows those in scikit-learn and the R `iso` package, runs in quadratic time in the worst case.
## How was this patch tested?

Existing unit tests in both `mllib` and `ml` passed before and after this patch. Scaling properties were tested by running the `poolAdjacentViolators` method in [scala-benchmarking-template](https://github.com/sirthias/scala-benchmarking-template) with the input generated by

``` scala
val x = (1 to length).toArray.map(_.toDouble)
val y = x.reverse.zipWithIndex.map{ case (yi, i) => if (i % 2 == 1) yi - 1.5 else yi}
val w = Array.fill(length)(1d)

val input: Array[(Double, Double, Double)] = (y zip x zip w) map{ case ((y, x), w) => (y, x, w)}
```

Before this patch:

| Input Length | Time (us) |
| --: | --: |
| 100 | 1.35 |
| 200 | 3.14 |
| 400 | 116.10 |
| 800 | 2134225.90 |

After this patch:

| Input Length | Time (us) |
| --: | --: |
| 100 | 1.25 |
| 200 | 2.53 |
| 400 | 5.86 |
| 800 | 10.55 |

Benchmarking was also performed with randomly-generated y values, with similar results.

Author: z001qdp <Nicholas.Eggert@target.com>

Closes #15018 from neggert/SPARK-17455-isoreg-algo.
parent 4a11d029
No related branches found
No related tags found
No related merge requests found
......@@ -236,9 +236,8 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
* Only univariate (single feature) algorithm supported.
*
* Sequential PAV implementation based on:
* Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
* "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
* Available from <a href="http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf">here</a>
* Grotzinger, S. J., and C. Witzgall.
* "Projections onto order simplexes." Applied mathematics and Optimization 12.1 (1984): 247-270.
*
* Sequential PAV parallelization based on:
* Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
......@@ -312,90 +311,118 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
}
/**
* Performs a pool adjacent violators algorithm (PAV).
* Uses approach with single processing of data where violators
* in previously processed data created by pooling are fixed immediately.
* Uses optimization of discovering monotonicity violating sequences (blocks).
* Performs a pool adjacent violators algorithm (PAV). Implements the algorithm originally
* described in [1], using the formulation from [2, 3]. Uses an array to keep track of start
* and end indices of blocks.
*
* @param input Input data of tuples (label, feature, weight).
* [1] Grotzinger, S. J., and C. Witzgall. "Projections onto order simplexes." Applied
* mathematics and Optimization 12.1 (1984): 247-270.
*
* [2] Best, Michael J., and Nilotpal Chakravarti. "Active set algorithms for isotonic
* regression; a unifying framework." Mathematical Programming 47.1-3 (1990): 425-439.
*
* [3] Best, Michael J., Nilotpal Chakravarti, and Vasant A. Ubhaya. "Minimizing separable convex
* functions subject to simple chain constraints." SIAM Journal on Optimization 10.3 (2000):
* 658-672.
*
* @param input Input data of tuples (label, feature, weight). Weights must
be non-negative.
* @return Result tuples (label, feature, weight) where labels were updated
* to form a monotone sequence as per isotonic regression definition.
*/
private def poolAdjacentViolators(
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
if (input.isEmpty) {
return Array.empty
val cleanInput = input.filter{ case (y, x, weight) =>
require(
weight >= 0.0,
s"Negative weight at point ($y, $x, $weight). Weights must be non-negative"
)
weight > 0
}
// Pools sub array within given bounds assigning weighted average value to all elements.
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
val poolSubArray = input.slice(start, end + 1)
if (cleanInput.isEmpty) {
return Array.empty
}
val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
val weight = poolSubArray.map(_._3).sum
// Keeps track of the start and end indices of the blocks. if [i, j] is a valid block from
// cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j and blockBounds(j) = i
// Initially, each data point is its own block.
val blockBounds = Array.range(0, cleanInput.length)
var i = start
while (i <= end) {
input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
i = i + 1
}
// Keep track of the sum of weights and sum of weight * y for each block. weights(start)
// gives the values for the block. Entries that are not at the start of a block
// are meaningless.
val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, weight) =>
(weight, weight * y)
}
var i = 0
val len = input.length
while (i < len) {
var j = i
// a few convenience functions to make the code more readable
// Find monotonicity violating sequence, if any.
while (j < len - 1 && input(j)._1 > input(j + 1)._1) {
j = j + 1
}
// blockStart and blockEnd have identical implementations. We create two different
// functions to make the code more expressive
def blockEnd(start: Int): Int = blockBounds(start)
def blockStart(end: Int): Int = blockBounds(end)
// If monotonicity was not violated, move to next data point.
if (i == j) {
i = i + 1
} else {
// Otherwise pool the violating sequence
// and check if pooling caused monotonicity violation in previously processed points.
while (i >= 0 && input(i)._1 > input(i + 1)._1) {
pool(input, i, j)
i = i - 1
}
// the next block starts at the index after the end of this block
def nextBlock(start: Int): Int = blockEnd(start) + 1
i = j
}
// the previous block ends at the index before the start of this block
// we then use blockStart to find the start
def prevBlock(start: Int): Int = blockStart(start - 1)
// Merge two adjacent blocks, updating blockBounds and weights to reflect the merge
// Return the start index of the merged block
def merge(block1: Int, block2: Int): Int = {
assert(
blockEnd(block1) + 1 == block2,
s"Attempting to merge non-consecutive blocks [${block1}, ${blockEnd(block1)}]" +
s" and [${block2}, ${blockEnd(block2)}]. This is likely a bug in the isotonic regression" +
" implementation. Please file a bug report."
)
blockBounds(block1) = blockEnd(block2)
blockBounds(blockEnd(block2)) = block1
val w1 = weights(block1)
val w2 = weights(block2)
weights(block1) = (w1._1 + w2._1, w1._2 + w2._2)
block1
}
// For points having the same prediction, we only keep two boundary points.
val compressed = ArrayBuffer.empty[(Double, Double, Double)]
// average value of a block
def average(start: Int): Double = weights(start)._2 / weights(start)._1
var (curLabel, curFeature, curWeight) = input.head
var rightBound = curFeature
def merge(): Unit = {
compressed += ((curLabel, curFeature, curWeight))
if (rightBound > curFeature) {
compressed += ((curLabel, rightBound, 0.0))
// Implement Algorithm PAV from [3].
// Merge on >= instead of > because it eliminates adjacent blocks with the same average, and we
// want to compress our output as much as possible. Both give correct results.
var i = 0
while (nextBlock(i) < cleanInput.length) {
if (average(i) >= average(nextBlock(i))) {
merge(i, nextBlock(i))
while((i > 0) && (average(prevBlock(i)) >= average(i))) {
i = merge(prevBlock(i), i)
}
} else {
i = nextBlock(i)
}
}
i = 1
while (i < input.length) {
val (label, feature, weight) = input(i)
if (label == curLabel) {
curWeight += weight
rightBound = feature
// construct the output by walking through the blocks in order
val output = ArrayBuffer.empty[(Double, Double, Double)]
i = 0
while (i < cleanInput.length) {
// If block size is > 1, a point at the start and end of the block,
// each receiving half the weight. Otherwise, a single point with
// all the weight.
if (cleanInput(blockEnd(i))._2 > cleanInput(i)._2) {
output += ((average(i), cleanInput(i)._2, weights(i)._1 / 2))
output += ((average(i), cleanInput(blockEnd(i))._2, weights(i)._1 / 2))
} else {
merge()
curLabel = label
curFeature = feature
curWeight = weight
rightBound = curFeature
output += ((average(i), cleanInput(i)._2, weights(i)._1))
}
i += 1
i = nextBlock(i)
}
merge()
compressed.toArray
output.toArray
}
/**
......
......@@ -19,7 +19,7 @@ package org.apache.spark.mllib.regression
import org.scalatest.Matchers
import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.util.Utils
......@@ -163,17 +163,16 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w
}
test("weighted isotonic regression with negative weights") {
val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)
assert(model.boundaries === Array(0.0, 1.0, 4.0))
assert(model.predictions === Array(1.0, 10.0/6, 10.0/6))
val ex = intercept[SparkException] {
runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)
}
assert(ex.getCause.isInstanceOf[IllegalArgumentException])
}
test("weighted isotonic regression with zero weights") {
val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true)
assert(model.boundaries === Array(0.0, 1.0, 4.0))
assert(model.predictions === Array(1, 2, 2))
val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1, 0), Seq(0, 0, 0, 1, 1, 0), true)
assert(model.boundaries === Array(3, 4))
assert(model.predictions === Array(1.5, 1.5))
}
test("SPARK-16426 isotonic regression with duplicate features that produce NaNs") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment