diff --git a/mllib/src/main/scala/spark/mllib/classification/Classification.scala b/mllib/src/main/scala/spark/mllib/classification/Classification.scala new file mode 100644 index 0000000000000000000000000000000000000000..d6154b66aed7df4686dc0d552f6b8e8d5cde4e97 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/classification/Classification.scala @@ -0,0 +1,21 @@ +package spark.mllib.classification + +import spark.RDD + +trait ClassificationModel extends Serializable { + /** + * Predict values for the given data set using the model trained. + * + * @param testData RDD representing data points to be predicted + * @return RDD[Int] where each entry contains the corresponding prediction + */ + def predict(testData: RDD[Array[Double]]): RDD[Int] + + /** + * Predict values for a single data point using the model trained. + * + * @param testData array representing a single data point + * @return Int prediction from the trained model + */ + def predict(testData: Array[Double]): Int +} diff --git a/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala similarity index 80% rename from mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala rename to mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala index 4b2254601706a484253865e7f5def99a18ccbe71..bc1c32772928b09c73ba5b0be9389115e318432a 100644 --- a/mllib/src/main/scala/spark/mllib/regression/LogisticRegression.scala +++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala @@ -15,12 +15,14 @@ * limitations under the License. */ -package spark.mllib.regression +package spark.mllib.classification import spark.{Logging, RDD, SparkContext} import spark.mllib.optimization._ import spark.mllib.util.MLUtils +import scala.math.round + import org.jblas.DoubleMatrix /** @@ -30,48 +32,30 @@ import org.jblas.DoubleMatrix class LogisticRegressionModel( val weights: Array[Double], val intercept: Double, - val stochasticLosses: Array[Double]) extends RegressionModel { + val stochasticLosses: Array[Double]) extends ClassificationModel { // Create a column vector that can be used for predictions private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) - override def predict(testData: spark.RDD[Array[Double]]) = { + override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = { // A small optimization to avoid serializing the entire model. Only the weightsMatrix // and intercept is needed. val localWeights = weightsMatrix val localIntercept = intercept testData.map { x => val margin = new DoubleMatrix(1, x.length, x:_*).mmul(localWeights).get(0) + localIntercept - 1.0/ (1.0 + math.exp(margin * -1)) + round(1.0/ (1.0 + math.exp(margin * -1))).toInt } } - override def predict(testData: Array[Double]): Double = { + override def predict(testData: Array[Double]): Int = { val dataMat = new DoubleMatrix(1, testData.length, testData:_*) val margin = dataMat.mmul(weightsMatrix).get(0) + this.intercept - 1.0/ (1.0 + math.exp(margin * -1)) - } -} - -class LogisticGradient extends Gradient { - override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): - (DoubleMatrix, Double) = { - val margin: Double = -1.0 * data.dot(weights) - val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label - - val gradient = data.mul(gradientMultiplier) - val loss = - if (margin > 0) { - math.log(1 + math.exp(0 - margin)) - } else { - math.log(1 + math.exp(margin)) - margin - } - - (gradient, loss) + round(1.0/ (1.0 + math.exp(margin * -1))).toInt } } -class LogisticRegression private (var stepSize: Double, var miniBatchFraction: Double, +class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBatchFraction: Double, var numIters: Int) extends Logging { @@ -104,19 +88,19 @@ class LogisticRegression private (var stepSize: Double, var miniBatchFraction: D this } - def train(input: RDD[(Double, Array[Double])]): LogisticRegressionModel = { + def train(input: RDD[(Int, Array[Double])]): LogisticRegressionModel = { val nfeatures: Int = input.take(1)(0)._2.length val initialWeights = Array.fill(nfeatures)(1.0) train(input, initialWeights) } def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], initialWeights: Array[Double]): LogisticRegressionModel = { // Add a extra variable consisting of all 1.0's for the intercept. val data = input.map { case (y, features) => - (y, Array(1.0, features:_*)) + (y.toDouble, Array(1.0, features:_*)) } val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) @@ -127,6 +111,7 @@ class LogisticRegression private (var stepSize: Double, var miniBatchFraction: D new SimpleUpdater(), stepSize, numIters, + 0.0, initalWeightsWithIntercept, miniBatchFraction) @@ -147,7 +132,7 @@ class LogisticRegression private (var stepSize: Double, var miniBatchFraction: D * NOTE(shivaram): We use multiple train methods instead of default arguments to support * Java programs. */ -object LogisticRegression { +object LogisticRegressionLocalRandomSGD { /** * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number @@ -163,14 +148,15 @@ object LogisticRegression { * the number of features in the data. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], numIterations: Int, stepSize: Double, + miniBatchFraction: Double, initialWeights: Array[Double]) : LogisticRegressionModel = { - new LogisticRegression(stepSize, miniBatchFraction, numIterations).train(input, initialWeights) + new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input, initialWeights) } /** @@ -181,16 +167,18 @@ object LogisticRegression { * @param input RDD of (label, array of features) pairs. * @param numIterations Number of iterations of gradient descent to run. * @param stepSize Step size to be used for each iteration of gradient descent. + * @param miniBatchFraction Fraction of data to be used per iteration. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], numIterations: Int, stepSize: Double, + miniBatchFraction: Double) : LogisticRegressionModel = { - new LogisticRegression(stepSize, miniBatchFraction, numIterations).train(input) + new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input) } /** @@ -200,13 +188,15 @@ object LogisticRegression { * * @param input RDD of (label, array of features) pairs. * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param numIterations Number of iterations of gradient descent to run. * @return a LogisticRegressionModel which has the weights and offset from training. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], numIterations: Int, - stepSize: Double) + stepSize: Double + ) : LogisticRegressionModel = { train(input, numIterations, stepSize, 1.0) @@ -222,7 +212,7 @@ object LogisticRegression { * @return a LogisticRegressionModel which has the weights and offset from training. */ def train( - input: RDD[(Double, Array[Double])], + input: RDD[(Int, Array[Double])], numIterations: Int) : LogisticRegressionModel = { @@ -230,13 +220,13 @@ object LogisticRegression { } def main(args: Array[String]) { - if (args.length != 4) { - println("Usage: LogisticRegression <master> <input_dir> <step_size> <niters>") + if (args.length != 5) { + println("Usage: LogisticRegression <master> <input_dir> <step_size> <regularization_parameter> <niters>") System.exit(1) } val sc = new SparkContext(args(0), "LogisticRegression") - val data = MLUtils.loadLabeledData(sc, args(1)) - val model = LogisticRegression.train(data, args(3).toInt, args(2).toDouble) + val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) + val model = LogisticRegressionLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) sc.stop() } diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala new file mode 100644 index 0000000000000000000000000000000000000000..15b689e7e00dc9eb14c2a88e3b01af4c79ee4e00 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.classification + +import scala.math.signum +import spark.{Logging, RDD, SparkContext} +import spark.mllib.optimization._ +import spark.mllib.util.MLUtils + +import org.jblas.DoubleMatrix + +/** + * SVM using Stochastic Gradient Descent. + */ +class SVMModel( + val weights: Array[Double], + val intercept: Double, + val stochasticLosses: Array[Double]) extends ClassificationModel { + + // Create a column vector that can be used for predictions + private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) + + override def predict(testData: spark.RDD[Array[Double]]): RDD[Int] = { + // A small optimization to avoid serializing the entire model. Only the weightsMatrix + // and intercept is needed. + val localWeights = weightsMatrix + val localIntercept = intercept + testData.map { x => + signum(new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept).toInt + } + } + + override def predict(testData: Array[Double]): Int = { + val dataMat = new DoubleMatrix(1, testData.length, testData:_*) + signum(dataMat.dot(weightsMatrix) + this.intercept).toInt + } +} + + + +class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double, var miniBatchFraction: Double, + var numIters: Int) + extends Logging { + + /** + * Construct a SVM object with default parameters + */ + def this() = this(1.0, 1.0, 1.0, 100) + + /** + * Set the step size per-iteration of SGD. Default 1.0. + */ + def setStepSize(step: Double) = { + this.stepSize = step + this + } + + /** + * Set the regularization parameter. Default 1.0. + */ + def setRegParam(param: Double) = { + this.regParam = param + this + } + + /** + * Set fraction of data to be used for each SGD iteration. Default 1.0. + */ + def setMiniBatchFraction(fraction: Double) = { + this.miniBatchFraction = fraction + this + } + + /** + * Set the number of iterations for SGD. Default 100. + */ + def setNumIterations(iters: Int) = { + this.numIters = iters + this + } + + def train(input: RDD[(Int, Array[Double])]): SVMModel = { + val nfeatures: Int = input.take(1)(0)._2.length + val initialWeights = Array.fill(nfeatures)(1.0) + train(input, initialWeights) + } + + def train( + input: RDD[(Int, Array[Double])], + initialWeights: Array[Double]): SVMModel = { + + // Add a extra variable consisting of all 1.0's for the intercept. + val data = input.map { case (y, features) => + (y.toDouble, Array(1.0, features:_*)) + } + + val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) + + val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( + data, + new HingeGradient(), + new SquaredL2Updater(), + stepSize, + numIters, + regParam, + initalWeightsWithIntercept, + miniBatchFraction) + + val intercept = weights(0) + val weightsScaled = weights.tail + + val model = new SVMModel(weightsScaled, intercept, stochasticLosses) + + logInfo("Final model weights " + model.weights.mkString(",")) + logInfo("Final model intercept " + model.intercept) + logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", ")) + model + } +} + +/** + * Top-level methods for calling SVM. + + + */ +object SVMLocalRandomSGD { + + /** + * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in + * gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[(Int, Array[Double])], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : SVMModel = + { + new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input, initialWeights) + } + + /** + * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[(Int, Array[Double])], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double) + : SVMModel = + { + new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input) + } + + /** + * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. We use the entire data set to update + * the gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param regParam Regularization parameter. + * @param numIterations Number of iterations of gradient descent to run. + * @return a SVMModel which has the weights and offset from training. + */ + def train( + input: RDD[(Int, Array[Double])], + numIterations: Int, + stepSize: Double, + regParam: Double) + : SVMModel = + { + train(input, numIterations, stepSize, regParam, 1.0) + } + + /** + * Train a SVM model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using a step size of 1.0. We use the entire data set to update + * the gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @return a SVMModel which has the weights and offset from training. + */ + def train( + input: RDD[(Int, Array[Double])], + numIterations: Int) + : SVMModel = + { + train(input, numIterations, 1.0, 1.0, 1.0) + } + + def main(args: Array[String]) { + if (args.length != 5) { + println("Usage: SVM <master> <input_dir> <step_size> <regularization_parameter> <niters>") + System.exit(1) + } + val sc = new SparkContext(args(0), "SVM") + val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2)) + val model = SVMLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + + sc.stop() + } +} diff --git a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala index d5338360c83f6ae5c0360088a9070d5a16b64f6f..22b2ec5ed60f9fec9e731205e38d5fbaba1319b1 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/Gradient.scala @@ -30,3 +30,48 @@ abstract class Gradient extends Serializable { def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): (DoubleMatrix, Double) } + +class LogisticGradient extends Gradient { + override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): + (DoubleMatrix, Double) = { + val margin: Double = -1.0 * data.dot(weights) + val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label + + val gradient = data.mul(gradientMultiplier) + val loss = + if (margin > 0) { + math.log(1 + math.exp(0 - margin)) + } else { + math.log(1 + math.exp(margin)) - margin + } + + (gradient, loss) + } +} + + +class SquaredGradient extends Gradient { + override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): + (DoubleMatrix, Double) = { + val diff: Double = data.dot(weights) - label + + val loss = 0.5 * diff * diff + val gradient = data.mul(diff) + + (gradient, loss) + } +} + + +class HingeGradient extends Gradient { + override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): + (DoubleMatrix, Double) = { + + val dotProduct = data.dot(weights) + + if (1.0 > label * dotProduct) + (data.mul(-label), 1.0 - label * dotProduct) + else + (DoubleMatrix.zeros(1,weights.length), 0.0) + } +} diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala index 185a2a24f637f73a66d5b09aed6a2e1748322e39..d4b83a14561d3cd3d54d6650918609b9e4eafd8c 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala @@ -36,6 +36,7 @@ object GradientDescent { * @param updater - Updater object that will be used to update the model. * @param stepSize - stepSize to be used during update. * @param numIters - number of iterations that SGD should be run. + * @param regParam - regularization parameter * @param miniBatchFraction - fraction of the input data set that should be used for * one iteration of SGD. Default value 1.0. * @@ -49,6 +50,7 @@ object GradientDescent { updater: Updater, stepSize: Double, numIters: Int, + regParam: Double, initialWeights: Array[Double], miniBatchFraction: Double=1.0) : (Array[Double], Array[Double]) = { @@ -70,9 +72,14 @@ object GradientDescent { }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2)) stochasticLossHistory.append(lossSum / miniBatchSize + reg_val) - val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i) + val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam) weights = update._1 reg_val = update._2 + stochasticLossHistory.append(lossSum / miniBatchSize + reg_val) + /* + * NOTE(Xinghao): The loss here is sum of lossSum computed using the weights before applying updater, + * and reg_val using weights after applying updater + */ } (weights.toArray, stochasticLossHistory.toArray) diff --git a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala index 18cb5f3a95ce1e9a1b74efd5eae1ad38e9b45dd3..188fe7d972d7bbe2ebab2bd4958da815ad0534ad 100644 --- a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala +++ b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala @@ -17,6 +17,7 @@ package spark.mllib.optimization +import scala.math._ import org.jblas.DoubleMatrix abstract class Updater extends Serializable { @@ -27,18 +28,55 @@ abstract class Updater extends Serializable { * @param gradient - Column matrix of size nx1 where n is the number of features. * @param stepSize - step size across iterations * @param iter - Iteration number + * @param regParam - Regularization parameter * * @return A tuple of 2 elements. The first element is a column matrix containing updated weights, * and the second element is the regularization value. */ - def compute(weightsOlds: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int): + def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) } class SimpleUpdater extends Updater { override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, - stepSize: Double, iter: Int): (DoubleMatrix, Double) = { + stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = { val normGradient = gradient.mul(stepSize / math.sqrt(iter)) (weightsOld.sub(normGradient), 0) } } + +/** +* L1 regularization -- corresponding proximal operator is the soft-thresholding function +* That is, each weight component is shrunk towards 0 by shrinkageVal +* If w > shrinkageVal, set weight component to w-shrinkageVal. +* If w < -shrinkageVal, set weight component to w+shrinkageVal. +* If -shrinkageVal < w < shrinkageVal, set weight component to 0. +* Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal) +**/ +class L1Updater extends Updater { + override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, + stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = { + val thisIterStepSize = stepSize / math.sqrt(iter) + val normGradient = gradient.mul(thisIterStepSize) + // Take gradient step + val newWeights = weightsOld.sub(normGradient) + // Soft thresholding + val shrinkageVal = regParam * thisIterStepSize + (0 until newWeights.length).foreach(i => { + val wi = newWeights.get(i) + newWeights.put(i, signum(wi) * max(0.0, abs(wi) - shrinkageVal)) + }) + (newWeights, newWeights.norm1 * regParam) + } +} + +class SquaredL2Updater extends Updater { + override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, + stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = { + val thisIterStepSize = stepSize / math.sqrt(iter) + val normGradient = gradient.mul(thisIterStepSize) + val newWeights = weightsOld.sub(normGradient).div(2.0 * thisIterStepSize * regParam + 1.0) + (newWeights, pow(newWeights.norm2,2.0) * regParam) + } +} + diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala new file mode 100644 index 0000000000000000000000000000000000000000..1952658bb204ef28bb2a15c2784e59686991ea75 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression + +import spark.{Logging, RDD, SparkContext} +import spark.mllib.optimization._ +import spark.mllib.util.MLUtils + +import org.jblas.DoubleMatrix + +/** + * Lasso using Stochastic Gradient Descent. + * + */ +class LassoModel( + val weights: Array[Double], + val intercept: Double, + val stochasticLosses: Array[Double]) extends RegressionModel { + + // Create a column vector that can be used for predictions + private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*) + + override def predict(testData: spark.RDD[Array[Double]]) = { + // A small optimization to avoid serializing the entire model. Only the weightsMatrix + // and intercept is needed. + val localWeights = weightsMatrix + val localIntercept = intercept + testData.map { x => + new DoubleMatrix(1, x.length, x:_*).dot(localWeights) + localIntercept + } + } + + + override def predict(testData: Array[Double]): Double = { + val dataMat = new DoubleMatrix(1, testData.length, testData:_*) + dataMat.dot(weightsMatrix) + this.intercept + } +} + + +class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double, var miniBatchFraction: Double, + var numIters: Int) + extends Logging { + + /** + * Construct a Lasso object with default parameters + */ + def this() = this(1.0, 1.0, 1.0, 100) + + /** + * Set the step size per-iteration of SGD. Default 1.0. + */ + def setStepSize(step: Double) = { + this.stepSize = step + this + } + + /** + * Set the regularization parameter. Default 1.0. + */ + def setRegParam(param: Double) = { + this.regParam = param + this + } + + /** + * Set fraction of data to be used for each SGD iteration. Default 1.0. + */ + def setMiniBatchFraction(fraction: Double) = { + this.miniBatchFraction = fraction + this + } + + /** + * Set the number of iterations for SGD. Default 100. + */ + def setNumIterations(iters: Int) = { + this.numIters = iters + this + } + + def train(input: RDD[(Double, Array[Double])]): LassoModel = { + val nfeatures: Int = input.take(1)(0)._2.length + val initialWeights = Array.fill(nfeatures)(1.0) + train(input, initialWeights) + } + + def train( + input: RDD[(Double, Array[Double])], + initialWeights: Array[Double]): LassoModel = { + + // Add a extra variable consisting of all 1.0's for the intercept. + val data = input.map { case (y, features) => + (y, Array(1.0, features:_*)) + } + + val initalWeightsWithIntercept = Array(1.0, initialWeights:_*) + + val (weights, stochasticLosses) = GradientDescent.runMiniBatchSGD( + data, + new SquaredGradient(), + new L1Updater(), + stepSize, + numIters, + regParam, + initalWeightsWithIntercept, + miniBatchFraction) + + val intercept = weights(0) + val weightsScaled = weights.tail + + val model = new LassoModel(weightsScaled, intercept, stochasticLosses) + + logInfo("Final model weights " + model.weights.mkString(",")) + logInfo("Final model intercept " + model.intercept) + logInfo("Last 10 stochasticLosses " + model.stochasticLosses.takeRight(10).mkString(", ")) + model + } +} + +/** + * Top-level methods for calling Lasso. + * + * + */ +object LassoLocalRandomSGD { + + /** + * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in + * gradient descent are initialized using the initial weights provided. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + * @param initialWeights Initial set of weights to be used. Array should be equal in size to + * the number of features in the data. + */ + def train( + input: RDD[(Double, Array[Double])], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double, + initialWeights: Array[Double]) + : LassoModel = + { + new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input, initialWeights) + } + + /** + * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. Each iteration uses + * `miniBatchFraction` fraction of the data to calculate the gradient. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @param stepSize Step size to be used for each iteration of gradient descent. + * @param regParam Regularization parameter. + * @param miniBatchFraction Fraction of data to be used per iteration. + */ + def train( + input: RDD[(Double, Array[Double])], + numIterations: Int, + stepSize: Double, + regParam: Double, + miniBatchFraction: Double) + : LassoModel = + { + new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input) + } + + /** + * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using the specified step size. We use the entire data set to update + * the gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. + * @param stepSize Step size to be used for each iteration of Gradient Descent. + * @param regParam Regularization parameter. + * @param numIterations Number of iterations of gradient descent to run. + * @return a LassoModel which has the weights and offset from training. + */ + def train( + input: RDD[(Double, Array[Double])], + numIterations: Int, + stepSize: Double, + regParam: Double) + : LassoModel = + { + train(input, numIterations, stepSize, regParam, 1.0) + } + + /** + * Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number + * of iterations of gradient descent using a step size of 1.0. We use the entire data set to update + * the gradient in each iteration. + * + * @param input RDD of (label, array of features) pairs. + * @param numIterations Number of iterations of gradient descent to run. + * @return a LassoModel which has the weights and offset from training. + */ + def train( + input: RDD[(Double, Array[Double])], + numIterations: Int) + : LassoModel = + { + train(input, numIterations, 1.0, 1.0, 1.0) + } + + def main(args: Array[String]) { + if (args.length != 5) { + println("Usage: Lasso <master> <input_dir> <step_size> <regularization_parameter> <niters>") + System.exit(1) + } + val sc = new SparkContext(args(0), "Lasso") + val data = MLUtils.loadLabeledData(sc, args(1)) + val model = LassoLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble) + + sc.stop() + } +} diff --git a/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala new file mode 100644 index 0000000000000000000000000000000000000000..ef4f42a494ef8c5c9c1dcfda03a0672d737a014d --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/util/LassoDataGenerator.scala @@ -0,0 +1,45 @@ +package spark.mllib.regression + +import scala.util.Random + +import org.jblas.DoubleMatrix + +import spark.{RDD, SparkContext} +import spark.mllib.util.MLUtils + +object LassoGenerator { + + def main(args: Array[String]) { + if (args.length != 5) { + println("Usage: LassoGenerator " + + "<master> <output_dir> <num_examples> <num_features> <num_partitions>") + System.exit(1) + } + + val sparkMaster: String = args(0) + val outputPath: String = args(1) + val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 + val nfeatures: Int = if (args.length > 3) args(3).toInt else 2 + val parts: Int = if (args.length > 4) args(4).toInt else 2 + val eps = 3 + + val sc = new SparkContext(sparkMaster, "LassoGenerator") + + val globalRnd = new Random(94720) + val trueWeights = new DoubleMatrix(1, nfeatures+1, + Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*) + + val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx => + val rnd = new Random(42 + idx) + + val x = Array.fill[Double](nfeatures) { + rnd.nextDouble() * 2.0 - 1.0 + } + val y = (new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1 + (y, x) + } + + MLUtils.saveLabeledData(data, outputPath) + sc.stop() + } +} diff --git a/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala new file mode 100644 index 0000000000000000000000000000000000000000..00a54d9a702d05a6b7b5a4bd508fa74dcc764968 --- /dev/null +++ b/mllib/src/main/scala/spark/mllib/util/SVMDataGenerator.scala @@ -0,0 +1,48 @@ +package spark.mllib.classification + +import scala.util.Random +import scala.math.signum + +import org.jblas.DoubleMatrix + +import spark.{RDD, SparkContext} +import spark.mllib.util.MLUtils + +import org.jblas.DoubleMatrix + +object SVMGenerator { + + def main(args: Array[String]) { + if (args.length != 5) { + println("Usage: SVMGenerator " + + "<master> <output_dir> <num_examples> <num_features> <num_partitions>") + System.exit(1) + } + + val sparkMaster: String = args(0) + val outputPath: String = args(1) + val nexamples: Int = if (args.length > 2) args(2).toInt else 1000 + val nfeatures: Int = if (args.length > 3) args(3).toInt else 2 + val parts: Int = if (args.length > 4) args(4).toInt else 2 + val eps = 3 + + val sc = new SparkContext(sparkMaster, "SVMGenerator") + + val globalRnd = new Random(94720) + val trueWeights = new DoubleMatrix(1, nfeatures+1, + Array.fill[Double](nfeatures + 1) { globalRnd.nextGaussian() }:_*) + + val data: RDD[(Double, Array[Double])] = sc.parallelize(0 until nexamples, parts).map { idx => + val rnd = new Random(42 + idx) + + val x = Array.fill[Double](nfeatures) { + rnd.nextDouble() * 2.0 - 1.0 + } + val y = signum((new DoubleMatrix(1, x.length, x:_*)).dot(trueWeights) + rnd.nextGaussian() * 0.1) + (y, x) + } + + MLUtils.saveLabeledData(data, outputPath) + sc.stop() + } +} diff --git a/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala similarity index 87% rename from mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala rename to mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala index 0a99b78cf859b0f388fc1fc98c7d0f80a06c366e..d3fe58a382fbbfe78940ea0b7b145884ffa1c923 100644 --- a/mllib/src/test/scala/spark/mllib/regression/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package spark.mllib.regression +package spark.mllib.classification import scala.util.Random @@ -38,7 +38,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll { offset: Double, scale: Double, nPoints: Int, - seed: Int): Seq[(Double, Array[Double])] = { + seed: Int): Seq[(Int, Array[Double])] = { val rnd = new Random(seed) val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian()) @@ -51,19 +51,18 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll { // y <- A + B*x + rLogis() // y <- as.numeric(y > 0) - val y: Seq[Double] = (0 until nPoints).map { i => + val y: Seq[Int] = (0 until nPoints).map { i => val yVal = offset + scale * x1(i) + rLogis(i) - if (yVal > 0) 1.0 else 0.0 + if (yVal > 0) 1 else 0 } val testData = (0 until nPoints).map(i => (y(i), Array(x1(i)))) testData } - def validatePrediction(predictions: Seq[Double], input: Seq[(Double, Array[Double])]) { + def validatePrediction(predictions: Seq[Int], input: Seq[(Int, Array[Double])]) { val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) => - // A prediction is off if the prediction is more than 0.5 away from expected value. - math.abs(prediction - expected) > 0.5 + (prediction != expected) }.size // At least 80% of the predictions should be on. assert(numOffPredictions < input.length / 5) @@ -79,7 +78,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll { val testRDD = sc.parallelize(testData, 2) testRDD.cache() - val lr = new LogisticRegression().setStepSize(10.0).setNumIterations(20) + val lr = new LogisticRegressionLocalRandomSGD().setStepSize(10.0).setNumIterations(20) val model = lr.train(testRDD) @@ -111,7 +110,7 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll { testRDD.cache() // Use half as many iterations as the previous test. - val lr = new LogisticRegression().setStepSize(10.0).setNumIterations(10) + val lr = new LogisticRegressionLocalRandomSGD().setStepSize(10.0).setNumIterations(10) val model = lr.train(testRDD, initialWeights) diff --git a/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..d546e0729ee525f79f21aa700c51e8be89864c46 --- /dev/null +++ b/mllib/src/test/scala/spark/mllib/classification/SVMSuite.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.classification + +import scala.util.Random +import scala.math.signum + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import spark.SparkContext + +import org.jblas.DoubleMatrix + +class SVMSuite extends FunSuite with BeforeAndAfterAll { + val sc = new SparkContext("local", "test") + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) + def generateSVMInput( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): Seq[(Int, Array[Double])] = { + val rnd = new Random(seed) + val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) + val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian())) + val y = x.map(xi => + signum((new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian()).toInt + ) + y zip x + } + + def validatePrediction(predictions: Seq[Int], input: Seq[(Int, Array[Double])]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) => + (prediction != expected) + }.size + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + test("SVMLocalRandomSGD") { + val nPoints = 10000 + + val A = 2.0 + val B = -1.5 + val C = 1.0 + + val testData = generateSVMInput(A, Array[Double](B,C), nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val svm = new SVMLocalRandomSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + + val model = svm.train(testRDD) + + val validationData = generateSVMInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData,2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row._2)), validationData) + } + + test("SVMLocalRandomSGD with initial weights") { + val nPoints = 10000 + + val A = 2.0 + val B = -1.5 + val C = 1.0 + + val testData = generateSVMInput(A, Array[Double](B,C), nPoints, 42) + + val initialB = -1.0 + val initialC = -1.0 + val initialWeights = Array(initialB,initialC) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + + val svm = new SVMLocalRandomSGD().setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + + val model = svm.train(testRDD, initialWeights) + + val validationData = generateSVMInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData,2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row._2)), validationData) + } +} diff --git a/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..cf2b067d40f1102f24029caaacc6eba92c308614 --- /dev/null +++ b/mllib/src/test/scala/spark/mllib/regression/LassoSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.mllib.regression + +import scala.util.Random + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FunSuite + +import spark.SparkContext + +import org.jblas.DoubleMatrix + + +class LassoSuite extends FunSuite with BeforeAndAfterAll { + val sc = new SparkContext("local", "test") + + override def afterAll() { + sc.stop() + System.clearProperty("spark.driver.port") + } + + // Generate noisy input of the form Y = x.dot(weights) + intercept + noise + def generateLassoInput( + intercept: Double, + weights: Array[Double], + nPoints: Int, + seed: Int): Seq[(Double, Array[Double])] = { + val rnd = new Random(seed) + val weightsMat = new DoubleMatrix(1, weights.length, weights:_*) + val x = Array.fill[Array[Double]](nPoints)(Array.fill[Double](weights.length)(rnd.nextGaussian())) + val y = x.map(xi => + (new DoubleMatrix(1, xi.length, xi:_*)).dot(weightsMat) + intercept + 0.1 * rnd.nextGaussian() + ) + y zip x + } + + def validatePrediction(predictions: Seq[Double], input: Seq[(Double, Array[Double])]) { + val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) => + // A prediction is off if the prediction is more than 0.5 away from expected value. + math.abs(prediction - expected) > 0.5 + }.size + // At least 80% of the predictions should be on. + assert(numOffPredictions < input.length / 5) + } + + test("LassoLocalRandomSGD") { + val nPoints = 10000 + + val A = 2.0 + val B = -1.5 + val C = 1.0e-2 + + val testData = generateLassoInput(A, Array[Double](B,C), nPoints, 42) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + val ls = new LassoLocalRandomSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + + val model = ls.train(testRDD) + + val weight0 = model.weights(0) + val weight1 = model.weights(1) + assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") + assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") + assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]") + + val validationData = generateLassoInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData,2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row._2)), validationData) + } + + test("LassoLocalRandomSGD with initial weights") { + val nPoints = 10000 + + val A = 2.0 + val B = -1.5 + val C = 1.0e-2 + + val testData = generateLassoInput(A, Array[Double](B,C), nPoints, 42) + + val initialB = -1.0 + val initialC = -1.0 + val initialWeights = Array(initialB,initialC) + + val testRDD = sc.parallelize(testData, 2) + testRDD.cache() + val ls = new LassoLocalRandomSGD().setStepSize(1.0).setRegParam(0.01).setNumIterations(20) + + val model = ls.train(testRDD, initialWeights) + + val weight0 = model.weights(0) + val weight1 = model.weights(1) + assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]") + assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]") + assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]") + + val validationData = generateLassoInput(A, Array[Double](B,C), nPoints, 17) + val validationRDD = sc.parallelize(validationData,2) + + // Test prediction on RDD. + validatePrediction(model.predict(validationRDD.map(_._2)).collect(), validationData) + + // Test prediction on Array. + validatePrediction(validationData.map(row => model.predict(row._2)), validationData) + } +}