diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
index 667c72f379e7193b917783e533111c6e29afc0d9..cd8879ff886e24f5daf74c827bbf4e9204a15e96 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.mllib.examples;
 
+import java.util.regex.Pattern;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -24,11 +25,9 @@ import org.apache.spark.api.java.function.Function;
 
 import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
 import org.apache.spark.mllib.classification.LogisticRegressionModel;
+import org.apache.spark.mllib.linalg.Vectors;
 import org.apache.spark.mllib.regression.LabeledPoint;
 
-import java.util.Arrays;
-import java.util.regex.Pattern;
-
 /**
  * Logistic regression based classification using ML Lib.
  */
@@ -47,14 +46,10 @@ public final class JavaLR {
       for (int i = 0; i < tok.length; ++i) {
         x[i] = Double.parseDouble(tok[i]);
       }
-      return new LabeledPoint(y, x);
+      return new LabeledPoint(y, Vectors.dense(x));
     }
   }
 
-  public static void printWeights(double[] a) {
-    System.out.println(Arrays.toString(a));
-  }
-
   public static void main(String[] args) {
     if (args.length != 4) {
       System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
@@ -80,8 +75,7 @@ public final class JavaLR {
     LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
         iterations, stepSize);
 
-    System.out.print("Final w: ");
-    printWeights(model.weights());
+    System.out.print("Final w: " + model.weights());
 
     System.exit(0);
   }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 3449c698da60b4282f9fed433a8fca9ba64d37b9..2df5b0d02b699dd52f4592d0c51f56e86a231b92 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -110,16 +110,16 @@ class PythonMLLibAPI extends Serializable {
 
   private def trainRegressionModel(
       trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
-      dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
-      java.util.LinkedList[java.lang.Object] = {
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
     val data = dataBytesJRDD.rdd.map(xBytes => {
         val x = deserializeDoubleVector(xBytes)
-        LabeledPoint(x(0), x.slice(1, x.length))
+        LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
     })
     val initialWeights = deserializeDoubleVector(initialWeightsBA)
     val model = trainFunc(data, initialWeights)
     val ret = new java.util.LinkedList[java.lang.Object]()
-    ret.add(serializeDoubleVector(model.weights))
+    ret.add(serializeDoubleVector(model.weights.toArray))
     ret.add(model.intercept: java.lang.Double)
     ret
   }
@@ -127,75 +127,127 @@ class PythonMLLibAPI extends Serializable {
   /**
    * Java stub for Python mllib LinearRegressionWithSGD.train()
    */
-  def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
-      numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+  def trainLinearRegressionModelWithSGD(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int,
+      stepSize: Double,
+      miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    trainRegressionModel((data, initialWeights) =>
-        LinearRegressionWithSGD.train(data, numIterations, stepSize,
-                                      miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
+    trainRegressionModel(
+      (data, initialWeights) =>
+        LinearRegressionWithSGD.train(
+          data,
+          numIterations,
+          stepSize,
+          miniBatchFraction,
+          Vectors.dense(initialWeights)),
+      dataBytesJRDD,
+      initialWeightsBA)
   }
 
   /**
    * Java stub for Python mllib LassoWithSGD.train()
    */
-  def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
-      stepSize: Double, regParam: Double, miniBatchFraction: Double,
+  def trainLassoModelWithSGD(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int,
+      stepSize: Double,
+      regParam: Double,
+      miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    trainRegressionModel((data, initialWeights) =>
-        LassoWithSGD.train(data, numIterations, stepSize, regParam,
-                           miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
+    trainRegressionModel(
+      (data, initialWeights) =>
+        LassoWithSGD.train(
+          data,
+          numIterations,
+          stepSize,
+          regParam,
+          miniBatchFraction,
+          Vectors.dense(initialWeights)),
+      dataBytesJRDD,
+      initialWeightsBA)
   }
 
   /**
    * Java stub for Python mllib RidgeRegressionWithSGD.train()
    */
-  def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
-      stepSize: Double, regParam: Double, miniBatchFraction: Double,
+  def trainRidgeModelWithSGD(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int,
+      stepSize: Double,
+      regParam: Double,
+      miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    trainRegressionModel((data, initialWeights) =>
-        RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
-                                     miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
+    trainRegressionModel(
+      (data, initialWeights) =>
+        RidgeRegressionWithSGD.train(
+          data,
+          numIterations,
+          stepSize,
+          regParam,
+          miniBatchFraction,
+          Vectors.dense(initialWeights)),
+      dataBytesJRDD,
+      initialWeightsBA)
   }
 
   /**
    * Java stub for Python mllib SVMWithSGD.train()
    */
-  def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
-      stepSize: Double, regParam: Double, miniBatchFraction: Double,
+  def trainSVMModelWithSGD(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int,
+      stepSize: Double,
+      regParam: Double,
+      miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    trainRegressionModel((data, initialWeights) =>
-        SVMWithSGD.train(data, numIterations, stepSize, regParam,
-                                     miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
+    trainRegressionModel(
+      (data, initialWeights) =>
+        SVMWithSGD.train(
+          data,
+          numIterations,
+          stepSize,
+          regParam,
+          miniBatchFraction,
+          Vectors.dense(initialWeights)),
+      dataBytesJRDD,
+      initialWeightsBA)
   }
 
   /**
    * Java stub for Python mllib LogisticRegressionWithSGD.train()
    */
-  def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
-      numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+  def trainLogisticRegressionModelWithSGD(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int,
+      stepSize: Double,
+      miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    trainRegressionModel((data, initialWeights) =>
-        LogisticRegressionWithSGD.train(data, numIterations, stepSize,
-                                     miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
+    trainRegressionModel(
+      (data, initialWeights) =>
+        LogisticRegressionWithSGD.train(
+          data,
+          numIterations,
+          stepSize,
+          miniBatchFraction,
+          Vectors.dense(initialWeights)),
+      dataBytesJRDD,
+      initialWeightsBA)
   }
 
   /**
    * Java stub for NaiveBayes.train()
    */
-  def trainNaiveBayes(dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double)
-      : java.util.List[java.lang.Object] =
-  {
+  def trainNaiveBayes(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      lambda: Double): java.util.List[java.lang.Object] = {
     val data = dataBytesJRDD.rdd.map(xBytes => {
       val x = deserializeDoubleVector(xBytes)
-      LabeledPoint(x(0), x.slice(1, x.length))
+      LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
     })
     val model = NaiveBayes.train(data, lambda)
     val ret = new java.util.LinkedList[java.lang.Object]()
+    ret.add(serializeDoubleVector(model.labels))
     ret.add(serializeDoubleVector(model.pi))
     ret.add(serializeDoubleMatrix(model.theta))
     ret
@@ -204,9 +256,12 @@ class PythonMLLibAPI extends Serializable {
   /**
    * Java stub for Python mllib KMeans.train()
    */
-  def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
-      maxIterations: Int, runs: Int, initializationMode: String):
-      java.util.List[java.lang.Object] = {
+  def trainKMeansModel(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      k: Int,
+      maxIterations: Int,
+      runs: Int,
+      initializationMode: String): java.util.List[java.lang.Object] = {
     val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes)))
     val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
     val ret = new java.util.LinkedList[java.lang.Object]()
@@ -259,8 +314,12 @@ class PythonMLLibAPI extends Serializable {
    * needs to be taken in the Python code to ensure it gets freed on exit; see
    * the Py4J documentation.
    */
-  def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
-      iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
+  def trainALSModel(
+      ratingsBytesJRDD: JavaRDD[Array[Byte]],
+      rank: Int,
+      iterations: Int,
+      lambda: Double,
+      blocks: Int): MatrixFactorizationModel = {
     val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
     ALS.train(ratings, rank, iterations, lambda, blocks)
   }
@@ -271,8 +330,13 @@ class PythonMLLibAPI extends Serializable {
    * Extra care needs to be taken in the Python code to ensure it gets freed on
    * exit; see the Py4J documentation.
    */
-  def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
-      iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
+  def trainImplicitALSModel(
+      ratingsBytesJRDD: JavaRDD[Array[Byte]],
+      rank: Int,
+      iterations: Int,
+      lambda: Double,
+      blocks: Int,
+      alpha: Double): MatrixFactorizationModel = {
     val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
     ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
   }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
index 391f5b9b7a7de122edc6cd528e0b1f8d971cc4f7..bd10e2e9e10e2bc78abf327d90cb85ee4fdff4d1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
@@ -17,22 +17,27 @@
 
 package org.apache.spark.mllib.classification
 
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.rdd.RDD
 
+/**
+ * Represents a classification model that predicts to which of a set of categories an example
+ * belongs. The categories are represented by double values: 0.0, 1.0, 2.0, etc.
+ */
 trait ClassificationModel extends Serializable {
   /**
    * Predict values for the given data set using the model trained.
    *
    * @param testData RDD representing data points to be predicted
-   * @return RDD[Int] where each entry contains the corresponding prediction
+   * @return an RDD[Double] where each entry contains the corresponding prediction
    */
-  def predict(testData: RDD[Array[Double]]): RDD[Double]
+  def predict(testData: RDD[Vector]): RDD[Double]
 
   /**
    * Predict values for a single data point using the model trained.
    *
    * @param testData array representing a single data point
-   * @return Int prediction from the trained model
+   * @return predicted category from the trained model
    */
-  def predict(testData: Array[Double]): Double
+  def predict(testData: Vector): Double
 }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
index a481f522761e243d6d272bf30c89b71979a07fe0..798f3a5c94740241f8b9c136cba2e83685544909 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
@@ -17,16 +17,12 @@
 
 package org.apache.spark.mllib.classification
 
-import scala.math.round
-
 import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.util.MLUtils
-import org.apache.spark.mllib.util.DataValidators
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.mllib.util.{DataValidators, MLUtils}
+import org.apache.spark.rdd.RDD
 
 /**
  * Classification model trained using Logistic Regression.
@@ -35,15 +31,38 @@ import org.jblas.DoubleMatrix
  * @param intercept Intercept computed for this model.
  */
 class LogisticRegressionModel(
-    override val weights: Array[Double],
+    override val weights: Vector,
     override val intercept: Double)
-  extends GeneralizedLinearModel(weights, intercept)
-  with ClassificationModel with Serializable {
+  extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
+
+  private var threshold: Option[Double] = Some(0.5)
+
+  /**
+   * Sets the threshold that separates positive predictions from negative predictions. An example
+   * with prediction score greater than or equal to this threshold is identified as an positive,
+   * and negative otherwise. The default value is 0.5.
+   */
+  def setThreshold(threshold: Double): this.type = {
+    this.threshold = Some(threshold)
+    this
+  }
 
-  override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+  /**
+   * Clears the threshold so that `predict` will output raw prediction scores.
+   */
+  def clearThreshold(): this.type = {
+    threshold = None
+    this
+  }
+
+  override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
       intercept: Double) = {
-    val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept
-    round(1.0/ (1.0 + math.exp(margin * -1)))
+    val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
+    val score = 1.0/ (1.0 + math.exp(-margin))
+    threshold match {
+      case Some(t) => if (score < t) 0.0 else 1.0
+      case None => score
+    }
   }
 }
 
@@ -56,16 +75,15 @@ class LogisticRegressionWithSGD private (
     var numIterations: Int,
     var regParam: Double,
     var miniBatchFraction: Double)
-  extends GeneralizedLinearAlgorithm[LogisticRegressionModel]
-  with Serializable {
+  extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {
 
   val gradient = new LogisticGradient()
   val updater = new SimpleUpdater()
   override val optimizer = new GradientDescent(gradient, updater)
-      .setStepSize(stepSize)
-      .setNumIterations(numIterations)
-      .setRegParam(regParam)
-      .setMiniBatchFraction(miniBatchFraction)
+    .setStepSize(stepSize)
+    .setNumIterations(numIterations)
+    .setRegParam(regParam)
+    .setMiniBatchFraction(miniBatchFraction)
   override val validators = List(DataValidators.classificationLabels)
 
   /**
@@ -73,7 +91,7 @@ class LogisticRegressionWithSGD private (
    */
   def this() = this(1.0, 100, 0.0, 1.0)
 
-  def createModel(weights: Array[Double], intercept: Double) = {
+  def createModel(weights: Vector, intercept: Double) = {
     new LogisticRegressionModel(weights, intercept)
   }
 }
@@ -105,11 +123,9 @@ object LogisticRegressionWithSGD {
       numIterations: Int,
       stepSize: Double,
       miniBatchFraction: Double,
-      initialWeights: Array[Double])
-    : LogisticRegressionModel =
-  {
-    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
-      input, initialWeights)
+      initialWeights: Vector): LogisticRegressionModel = {
+    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
+      .run(input, initialWeights)
   }
 
   /**
@@ -128,11 +144,9 @@ object LogisticRegressionWithSGD {
       input: RDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
-      miniBatchFraction: Double)
-    : LogisticRegressionModel =
-  {
-    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
-      input)
+      miniBatchFraction: Double): LogisticRegressionModel = {
+    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
+      .run(input)
   }
 
   /**
@@ -150,9 +164,7 @@ object LogisticRegressionWithSGD {
   def train(
       input: RDD[LabeledPoint],
       numIterations: Int,
-      stepSize: Double)
-    : LogisticRegressionModel =
-  {
+      stepSize: Double): LogisticRegressionModel = {
     train(input, numIterations, stepSize, 1.0)
   }
 
@@ -168,9 +180,7 @@ object LogisticRegressionWithSGD {
    */
   def train(
       input: RDD[LabeledPoint],
-      numIterations: Int)
-    : LogisticRegressionModel =
-  {
+      numIterations: Int): LogisticRegressionModel = {
     train(input, numIterations, 1.0, 1.0)
   }
 
@@ -183,7 +193,7 @@ object LogisticRegressionWithSGD {
     val sc = new SparkContext(args(0), "LogisticRegression")
     val data = MLUtils.loadLabeledData(sc, args(1))
     val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
-    println("Weights: " + model.weights.mkString("[", ", ", "]"))
+    println("Weights: " + model.weights)
     println("Intercept: " + model.intercept)
 
     sc.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index 6539b2f3394659064a98bbd9282c8da5514136e7..e956185319a694985e9e74ce3a0f7c2a667ae2d6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -17,14 +17,14 @@
 
 package org.apache.spark.mllib.classification
 
-import scala.collection.mutable
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}
 
-import org.jblas.DoubleMatrix
-
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.rdd.RDD
 import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.rdd.RDD
 
 /**
  * Model for Naive Bayes Classifiers.
@@ -32,19 +32,28 @@ import org.apache.spark.mllib.util.MLUtils
  * @param pi Log of class priors, whose dimension is C.
  * @param theta Log of class conditional probabilities, whose dimension is CxD.
  */
-class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
-  extends ClassificationModel with Serializable {
-
-  // Create a column vector that can be used for predictions
-  private val _pi = new DoubleMatrix(pi.length, 1, pi: _*)
-  private val _theta = new DoubleMatrix(theta)
+class NaiveBayesModel(
+    val labels: Array[Double],
+    val pi: Array[Double],
+    val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {
+
+  private val brzPi = new BDV[Double](pi)
+  private val brzTheta = new BDM[Double](theta.length, theta(0).length)
+
+  var i = 0
+  while (i < theta.length) {
+    var j = 0
+    while (j < theta(i).length) {
+      brzTheta(i, j) = theta(i)(j)
+      j += 1
+    }
+    i += 1
+  }
 
-  def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict)
+  override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict)
 
-  def predict(testData: Array[Double]): Double = {
-    val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*)
-    val result = _pi.add(_theta.mmul(dataMatrix))
-    result.argmax()
+  override def predict(testData: Vector): Double = {
+    labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
   }
 }
 
@@ -56,9 +65,8 @@ class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
  * document classification.  By making every vector a 0-1 vector, it can also be used as
  * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
  */
-class NaiveBayes private (var lambda: Double)
-  extends Serializable with Logging
-{
+class NaiveBayes private (var lambda: Double) extends Serializable with Logging {
+
   def this() = this(1.0)
 
   /** Set the smoothing parameter. Default: 1.0. */
@@ -70,45 +78,42 @@ class NaiveBayes private (var lambda: Double)
   /**
    * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
    *
-   * @param data RDD of (label, array of features) pairs.
+   * @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
    */
   def run(data: RDD[LabeledPoint]) = {
-    // Aggregates all sample points to driver side to get sample count and summed feature vector
-    // for each label.  The shape of `zeroCombiner` & `aggregated` is:
-    //
-    //    label: Int -> (count: Int, featuresSum: DoubleMatrix)
-    val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)]
-    val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) =>
-      point match {
-        case LabeledPoint(label, features) =>
-          val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1)))
-          val fs = new DoubleMatrix(features.length, 1, features: _*)
-          combiner += label.toInt -> (count + 1, featuresSum.addi(fs))
-      }
-    }, { (lhs, rhs) =>
-      for ((label, (c, fs)) <- rhs) {
-        val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1)))
-        lhs(label) = (count + c, featuresSum.addi(fs))
+    // Aggregates term frequencies per label.
+    // TODO: Calling combineByKey and collect creates two stages, we can implement something
+    // TODO: similar to reduceByKeyLocally to save one stage.
+    val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])](
+      createCombiner = (v: Vector) => (1L, v.toBreeze.toDenseVector),
+      mergeValue = (c: (Long, BDV[Double]), v: Vector) => (c._1 + 1L, c._2 += v.toBreeze),
+      mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) =>
+        (c1._1 + c2._1, c1._2 += c2._2)
+    ).collect()
+    val numLabels = aggregated.length
+    var numDocuments = 0L
+    aggregated.foreach { case (_, (n, _)) =>
+      numDocuments += n
+    }
+    val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
+    val labels = new Array[Double](numLabels)
+    val pi = new Array[Double](numLabels)
+    val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
+    val piLogDenom = math.log(numDocuments + numLabels * lambda)
+    var i = 0
+    aggregated.foreach { case (label, (n, sumTermFreqs)) =>
+      labels(i) = label
+      val thetaLogDenom = math.log(brzSum(sumTermFreqs) + numFeatures * lambda)
+      pi(i) = math.log(n + lambda) - piLogDenom
+      var j = 0
+      while (j < numFeatures) {
+        theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
+        j += 1
       }
-      lhs
-    })
-
-    // Kinds of label
-    val C = aggregated.size
-    // Total sample count
-    val N = aggregated.values.map(_._1).sum
-
-    val pi = new Array[Double](C)
-    val theta = new Array[Array[Double]](C)
-    val piLogDenom = math.log(N + C * lambda)
-
-    for ((label, (count, fs)) <- aggregated) {
-      val thetaLogDenom = math.log(fs.sum() + fs.length * lambda)
-      pi(label) = math.log(count + lambda) - piLogDenom
-      theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom)
+      i += 1
     }
 
-    new NaiveBayesModel(pi, theta)
+    new NaiveBayesModel(labels, pi, theta)
   }
 }
 
@@ -158,8 +163,9 @@ object NaiveBayes {
     } else {
       NaiveBayes.train(data, args(2).toDouble)
     }
-    println("Pi: " + model.pi.mkString("[", ", ", "]"))
-    println("Theta:\n" + model.theta.map(_.mkString("[", ", ", "]")).mkString("[", "\n ", "]"))
+
+    println("Pi\n: " + model.pi)
+    println("Theta:\n" + model.theta)
 
     sc.stop()
   }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
index 6dff29dfb45ccae2bb497c61eab0294498ce3442..e31a08899f8bc2dc95e22302e273b56956c99a28 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
@@ -18,13 +18,11 @@
 package org.apache.spark.mllib.classification
 
 import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.util.MLUtils
-import org.apache.spark.mllib.util.DataValidators
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.mllib.util.{DataValidators, MLUtils}
+import org.apache.spark.rdd.RDD
 
 /**
  * Model for Support Vector Machines (SVMs).
@@ -33,15 +31,37 @@ import org.jblas.DoubleMatrix
  * @param intercept Intercept computed for this model.
  */
 class SVMModel(
-    override val weights: Array[Double],
+    override val weights: Vector,
     override val intercept: Double)
-  extends GeneralizedLinearModel(weights, intercept)
-  with ClassificationModel with Serializable {
+  extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
+
+  private var threshold: Option[Double] = Some(0.0)
+
+  /**
+   * Sets the threshold that separates positive predictions from negative predictions. An example
+   * with prediction score greater than or equal to this threshold is identified as an positive,
+   * and negative otherwise. The default value is 0.0.
+   */
+  def setThreshold(threshold: Double): this.type = {
+    this.threshold = Some(threshold)
+    this
+  }
 
-  override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+  /**
+   * Clears the threshold so that `predict` will output raw prediction scores.
+   */
+  def clearThreshold(): this.type = {
+    threshold = None
+    this
+  }
+
+  override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
       intercept: Double) = {
-    val margin = dataMatrix.dot(weightMatrix) + intercept
-    if (margin < 0) 0.0 else 1.0
+    val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
+    threshold match {
+      case Some(t) => if (margin < 0) 0.0 else 1.0
+      case None => margin
+    }
   }
 }
 
@@ -71,7 +91,7 @@ class SVMWithSGD private (
    */
   def this() = this(1.0, 100, 1.0, 1.0)
 
-  def createModel(weights: Array[Double], intercept: Double) = {
+  def createModel(weights: Vector, intercept: Double) = {
     new SVMModel(weights, intercept)
   }
 }
@@ -103,11 +123,9 @@ object SVMWithSGD {
       stepSize: Double,
       regParam: Double,
       miniBatchFraction: Double,
-      initialWeights: Array[Double])
-    : SVMModel =
-  {
-    new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input,
-      initialWeights)
+      initialWeights: Vector): SVMModel = {
+    new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction)
+      .run(input, initialWeights)
   }
 
   /**
@@ -127,9 +145,7 @@ object SVMWithSGD {
       numIterations: Int,
       stepSize: Double,
       regParam: Double,
-      miniBatchFraction: Double)
-    : SVMModel =
-  {
+      miniBatchFraction: Double): SVMModel = {
     new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
   }
 
@@ -149,9 +165,7 @@ object SVMWithSGD {
       input: RDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
-      regParam: Double)
-    : SVMModel =
-  {
+      regParam: Double): SVMModel = {
     train(input, numIterations, stepSize, regParam, 1.0)
   }
 
@@ -165,11 +179,7 @@ object SVMWithSGD {
    * @param numIterations Number of iterations of gradient descent to run.
    * @return a SVMModel which has the weights and offset from training.
    */
-  def train(
-      input: RDD[LabeledPoint],
-      numIterations: Int)
-    : SVMModel =
-  {
+  def train(input: RDD[LabeledPoint], numIterations: Int): SVMModel = {
     train(input, numIterations, 1.0, 1.0, 1.0)
   }
 
@@ -181,7 +191,8 @@ object SVMWithSGD {
     val sc = new SparkContext(args(0), "SVM")
     val data = MLUtils.loadLabeledData(sc, args(1))
     val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
-    println("Weights: " + model.weights.mkString("[", ", ", "]"))
+
+    println("Weights: " + model.weights)
     println("Intercept: " + model.intercept)
 
     sc.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index b412738e3f00a685bfd76bbf2f17b90f9718212a..a78503df3134d5928f0d9c70f33b6688eb08ec51 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -42,8 +42,7 @@ class KMeans private (
     var runs: Int,
     var initializationMode: String,
     var initializationSteps: Int,
-    var epsilon: Double)
-  extends Serializable with Logging {
+    var epsilon: Double) extends Serializable with Logging {
   def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)
 
   /** Set the number of clusters to create (k). Default: 2. */
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 01c1501548f8783d2169785a5626cdafdb088c22..2cea58cd3fd22b90143a69cabe58f8d9ac46cef4 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -54,6 +54,12 @@ trait Vector extends Serializable {
    * Converts the instance to a breeze vector.
    */
   private[mllib] def toBreeze: BV[Double]
+
+  /**
+   * Gets the value of the ith element.
+   * @param i index
+   */
+  private[mllib] def apply(i: Int): Double = toBreeze(i)
 }
 
 /**
@@ -145,6 +151,8 @@ class DenseVector(val values: Array[Double]) extends Vector {
   override def toArray: Array[Double] = values
 
   private[mllib] override def toBreeze: BV[Double] = new BDV[Double](values)
+
+  override def apply(i: Int) = values(i)
 }
 
 /**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
index 82124703da6cddf99327eda9b1124f18e6fe133d..20654284965edb2d838608dab4b33dd6d38b19e2 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.mllib.optimization
 
-import org.jblas.DoubleMatrix
+import breeze.linalg.{axpy => brzAxpy}
+
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
 
 /**
  * Class used to compute the gradient for a loss function, given a single data point.
@@ -26,17 +28,26 @@ abstract class Gradient extends Serializable {
   /**
    * Compute the gradient and loss given the features of a single data point.
    *
-   * @param data - Feature values for one data point. Column matrix of size dx1
-   *               where d is the number of features.
-   * @param label - Label for this data item.
-   * @param weights - Column matrix containing weights for every feature.
+   * @param data features for one data point
+   * @param label label for this data point
+   * @param weights weights/coefficients corresponding to features
    *
-   * @return A tuple of 2 elements. The first element is a column matrix containing the computed
-   *         gradient and the second element is the loss computed at this data point.
+   * @return (gradient: Vector, loss: Double)
+   */
+  def compute(data: Vector, label: Double, weights: Vector): (Vector, Double)
+
+  /**
+   * Compute the gradient and loss given the features of a single data point,
+   * add the gradient to a provided vector to avoid creating new objects, and return loss.
    *
+   * @param data features for one data point
+   * @param label label for this data point
+   * @param weights weights/coefficients corresponding to features
+   * @param cumGradient the computed gradient will be added to this vector
+   *
+   * @return loss
    */
-  def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): 
-      (DoubleMatrix, Double)
+  def compute(data: Vector, label: Double, weights: Vector, cumGradient: Vector): Double
 }
 
 /**
@@ -44,12 +55,12 @@ abstract class Gradient extends Serializable {
  * See also the documentation for the precise formulation.
  */
 class LogisticGradient extends Gradient {
-  override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): 
-      (DoubleMatrix, Double) = {
-    val margin: Double = -1.0 * data.dot(weights)
+  override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
+    val brzData = data.toBreeze
+    val brzWeights = weights.toBreeze
+    val margin: Double = -1.0 * brzWeights.dot(brzData)
     val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
-
-    val gradient = data.mul(gradientMultiplier)
+    val gradient = brzData * gradientMultiplier
     val loss =
       if (label > 0) {
         math.log(1 + math.exp(margin))
@@ -57,7 +68,26 @@ class LogisticGradient extends Gradient {
         math.log(1 + math.exp(margin)) - margin
       }
 
-    (gradient, loss)
+    (Vectors.fromBreeze(gradient), loss)
+  }
+
+  override def compute(
+      data: Vector,
+      label: Double,
+      weights: Vector,
+      cumGradient: Vector): Double = {
+    val brzData = data.toBreeze
+    val brzWeights = weights.toBreeze
+    val margin: Double = -1.0 * brzWeights.dot(brzData)
+    val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
+
+    brzAxpy(gradientMultiplier, brzData, cumGradient.toBreeze)
+
+    if (label > 0) {
+      math.log(1 + math.exp(margin))
+    } else {
+      math.log(1 + math.exp(margin)) - margin
+    }
   }
 }
 
@@ -68,14 +98,28 @@ class LogisticGradient extends Gradient {
  * See also the documentation for the precise formulation.
  */
 class LeastSquaresGradient extends Gradient {
-  override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): 
-      (DoubleMatrix, Double) = {
-    val diff: Double = data.dot(weights) - label
-
+  override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
+    val brzData = data.toBreeze
+    val brzWeights = weights.toBreeze
+    val diff = brzWeights.dot(brzData) - label
     val loss = diff * diff
-    val gradient =  data.mul(2.0 * diff)
+    val gradient = brzData * (2.0 * diff)
 
-    (gradient, loss)
+    (Vectors.fromBreeze(gradient), loss)
+  }
+
+  override def compute(
+      data: Vector,
+      label: Double,
+      weights: Vector,
+      cumGradient: Vector): Double = {
+    val brzData = data.toBreeze
+    val brzWeights = weights.toBreeze
+    val diff = brzWeights.dot(brzData) - label
+
+    brzAxpy(2.0 * diff, brzData, cumGradient.toBreeze)
+
+    diff * diff
   }
 }
 
@@ -85,19 +129,40 @@ class LeastSquaresGradient extends Gradient {
  * NOTE: This assumes that the labels are {0,1}
  */
 class HingeGradient extends Gradient {
-  override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
-      (DoubleMatrix, Double) = {
+  override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
+    val brzData = data.toBreeze
+    val brzWeights = weights.toBreeze
+    val dotProduct = brzWeights.dot(brzData)
+
+    // Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
+    // Therefore the gradient is -(2y - 1)*x
+    val labelScaled = 2 * label - 1.0
+
+    if (1.0 > labelScaled * dotProduct) {
+      (Vectors.fromBreeze(brzData * (-labelScaled)), 1.0 - labelScaled * dotProduct)
+    } else {
+      (Vectors.dense(new Array[Double](weights.size)), 0.0)
+    }
+  }
 
-    val dotProduct = data.dot(weights)
+  override def compute(
+      data: Vector,
+      label: Double,
+      weights: Vector,
+      cumGradient: Vector): Double = {
+    val brzData = data.toBreeze
+    val brzWeights = weights.toBreeze
+    val dotProduct = brzWeights.dot(brzData)
 
     // Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
     // Therefore the gradient is -(2y - 1)*x
     val labelScaled = 2 * label - 1.0
 
     if (1.0 > labelScaled * dotProduct) {
-      (data.mul(-labelScaled), 1.0 - labelScaled * dotProduct)
+      brzAxpy(-labelScaled, brzData, cumGradient.toBreeze)
+      1.0 - labelScaled * dotProduct
     } else {
-      (DoubleMatrix.zeros(1, weights.length), 0.0)
+      0.0
     }
   }
 }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
index b967b22e818d34fab5ae6f3ff47b62c48d42afc6..d0777ffd63ff8ba854c93fc528194de8934ae820 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.mllib.optimization
 
-import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
+import scala.collection.mutable.ArrayBuffer
 
-import org.jblas.DoubleMatrix
+import breeze.linalg.{Vector => BV, DenseVector => BDV}
 
-import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
 
 /**
  * Class used to solve an optimization problem using Gradient Descent.
@@ -91,18 +92,16 @@ class GradientDescent(var gradient: Gradient, var updater: Updater)
     this
   }
 
-  def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
-    : Array[Double] = {
-
-    val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD(
-        data,
-        gradient,
-        updater,
-        stepSize,
-        numIterations,
-        regParam,
-        miniBatchFraction,
-        initialWeights)
+  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
+    val (weights, _) = GradientDescent.runMiniBatchSGD(
+      data,
+      gradient,
+      updater,
+      stepSize,
+      numIterations,
+      regParam,
+      miniBatchFraction,
+      initialWeights)
     weights
   }
 
@@ -133,14 +132,14 @@ object GradientDescent extends Logging {
    *         stochastic loss computed for every iteration.
    */
   def runMiniBatchSGD(
-    data: RDD[(Double, Array[Double])],
+    data: RDD[(Double, Vector)],
     gradient: Gradient,
     updater: Updater,
     stepSize: Double,
     numIterations: Int,
     regParam: Double,
     miniBatchFraction: Double,
-    initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
+    initialWeights: Vector): (Vector, Array[Double]) = {
 
     val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
 
@@ -148,24 +147,27 @@ object GradientDescent extends Logging {
     val miniBatchSize = nexamples * miniBatchFraction
 
     // Initialize weights as a column vector
-    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
+    var weights = Vectors.dense(initialWeights.toArray)
 
     /**
      * For the first iteration, the regVal will be initialized as sum of sqrt of
      * weights if it's L2 update; for L1 update; the same logic is followed.
      */
     var regVal = updater.compute(
-      weights, new DoubleMatrix(initialWeights.length, 1), 0, 1, regParam)._2
+      weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
 
     for (i <- 1 to numIterations) {
       // Sample a subset (fraction miniBatchFraction) of the total data
       // compute and sum up the subgradients on this subset (this is one map-reduce)
-      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i).map {
-        case (y, features) =>
-          val featuresCol = new DoubleMatrix(features.length, 1, features:_*)
-          val (grad, loss) = gradient.compute(featuresCol, y, weights)
-          (grad, loss)
-      }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
+      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
+        .aggregate((BDV.zeros[Double](weights.size), 0.0))(
+          seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
+            val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad))
+            (grad, loss + l)
+          },
+          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
+            (grad1 += grad2, loss1 + loss2)
+          })
 
       /**
        * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
@@ -173,7 +175,7 @@ object GradientDescent extends Logging {
        */
       stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
       val update = updater.compute(
-        weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
+        weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
       weights = update._1
       regVal = update._2
     }
@@ -181,6 +183,6 @@ object GradientDescent extends Logging {
     logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
       stochasticLossHistory.takeRight(10).mkString(", ")))
 
-    (weights.toArray, stochasticLossHistory.toArray)
+    (weights, stochasticLossHistory.toArray)
   }
 }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
index 94d30b56f212bbb4f6cba28d9cd7f646f8f13d99..f9ce908a5f3b01b3689e40749fcd34bd2d2b8e2b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
@@ -19,11 +19,12 @@ package org.apache.spark.mllib.optimization
 
 import org.apache.spark.rdd.RDD
 
-trait Optimizer {
+import org.apache.spark.mllib.linalg.Vector
+
+trait Optimizer extends Serializable {
 
   /**
    * Solve the provided convex optimization problem. 
    */
-  def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]): Array[Double]
-
+  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector
 }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
index bf8f731459e991b7a1db995b8728c63f56d161ee..3b7754cd7ac28bb783a8ae5e19ff8b356c98d372 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
@@ -18,7 +18,10 @@
 package org.apache.spark.mllib.optimization
 
 import scala.math._
-import org.jblas.DoubleMatrix
+
+import breeze.linalg.{norm => brzNorm, axpy => brzAxpy, Vector => BV}
+
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
 
 /**
  * Class used to perform steps (weight update) using Gradient Descent methods.
@@ -47,8 +50,12 @@ abstract class Updater extends Serializable {
    * @return A tuple of 2 elements. The first element is a column matrix containing updated weights,
    *         and the second element is the regularization value computed using updated weights.
    */
-  def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int,
-      regParam: Double): (DoubleMatrix, Double)
+  def compute(
+      weightsOld: Vector,
+      gradient: Vector,
+      stepSize: Double,
+      iter: Int,
+      regParam: Double): (Vector, Double)
 }
 
 /**
@@ -56,11 +63,17 @@ abstract class Updater extends Serializable {
  * Uses a step-size decreasing with the square root of the number of iterations.
  */
 class SimpleUpdater extends Updater {
-  override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
-      stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
+  override def compute(
+      weightsOld: Vector,
+      gradient: Vector,
+      stepSize: Double,
+      iter: Int,
+      regParam: Double): (Vector, Double) = {
     val thisIterStepSize = stepSize / math.sqrt(iter)
-    val step = gradient.mul(thisIterStepSize)
-    (weightsOld.sub(step), 0)
+    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
+    brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
+
+    (Vectors.fromBreeze(brzWeights), 0)
   }
 }
 
@@ -83,19 +96,26 @@ class SimpleUpdater extends Updater {
  * Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal)
  */
 class L1Updater extends Updater {
-  override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
-      stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
+  override def compute(
+      weightsOld: Vector,
+      gradient: Vector,
+      stepSize: Double,
+      iter: Int,
+      regParam: Double): (Vector, Double) = {
     val thisIterStepSize = stepSize / math.sqrt(iter)
-    val step = gradient.mul(thisIterStepSize)
     // Take gradient step
-    val newWeights = weightsOld.sub(step)
+    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
+    brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
     // Apply proximal operator (soft thresholding)
     val shrinkageVal = regParam * thisIterStepSize
-    (0 until newWeights.length).foreach { i =>
-      val wi = newWeights.get(i)
-      newWeights.put(i, signum(wi) * max(0.0, abs(wi) - shrinkageVal))
+    var i = 0
+    while (i < brzWeights.length) {
+      val wi = brzWeights(i)
+      brzWeights(i) = signum(wi) * max(0.0, abs(wi) - shrinkageVal)
+      i += 1
     }
-    (newWeights, newWeights.norm1 * regParam)
+
+    (Vectors.fromBreeze(brzWeights), brzNorm(brzWeights, 1.0) * regParam)
   }
 }
 
@@ -105,16 +125,23 @@ class L1Updater extends Updater {
  * Uses a step-size decreasing with the square root of the number of iterations.
  */
 class SquaredL2Updater extends Updater {
-  override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
-      stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
-    val thisIterStepSize = stepSize / math.sqrt(iter)
-    val step = gradient.mul(thisIterStepSize)
+  override def compute(
+      weightsOld: Vector,
+      gradient: Vector,
+      stepSize: Double,
+      iter: Int,
+      regParam: Double): (Vector, Double) = {
     // add up both updates from the gradient of the loss (= step) as well as
     // the gradient of the regularizer (= regParam * weightsOld)
     // w' = w - thisIterStepSize * (gradient + regParam * w)
     // w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient
-    val newWeights = weightsOld.mul(1.0 - thisIterStepSize * regParam).sub(step)
-    (newWeights, 0.5 * pow(newWeights.norm2, 2.0) * regParam)
+    val thisIterStepSize = stepSize / math.sqrt(iter)
+    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
+    brzWeights :*= (1.0 - thisIterStepSize * regParam)
+    brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
+    val norm = brzNorm(brzWeights, 2.0)
+
+    (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
   }
 }
 
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
index 3e1ed91bf67291f773ad18bdfa61c30f18ece83c..80dc0f12ff84fc60f9c4b474376358baa70c2e9d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.mllib.regression
 
+import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
+
 import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.mllib.optimization._
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
 
 /**
  * GeneralizedLinearModel (GLM) represents a model trained using 
@@ -31,12 +32,9 @@ import org.jblas.DoubleMatrix
  * @param weights Weights computed for every feature.
  * @param intercept Intercept computed for this model.
  */
-abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: Double)
+abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double)
   extends Serializable {
 
-  // Create a column vector that can be used for predictions
-  private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
-
   /**
    * Predict the result given a data point and the weights learned.
    * 
@@ -44,8 +42,7 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept:
    * @param weightMatrix Column vector containing the weights of the model
    * @param intercept Intercept of the model.
    */
-  def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
-    intercept: Double): Double
+  protected def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
 
   /**
    * Predict values for the given data set using the model trained.
@@ -53,16 +50,13 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept:
    * @param testData RDD representing data points to be predicted
    * @return RDD[Double] where each entry contains the corresponding prediction
    */
-  def predict(testData: RDD[Array[Double]]): RDD[Double] = {
+  def predict(testData: RDD[Vector]): RDD[Double] = {
     // A small optimization to avoid serializing the entire model. Only the weightsMatrix
     // and intercept is needed.
-    val localWeights = weightsMatrix
+    val localWeights = weights
     val localIntercept = intercept
 
-    testData.map { x =>
-      val dataMatrix = new DoubleMatrix(1, x.length, x:_*)
-      predictPoint(dataMatrix, localWeights, localIntercept)
-    }
+    testData.map(v => predictPoint(v, localWeights, localIntercept))
   }
 
   /**
@@ -71,14 +65,13 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept:
    * @param testData array representing a single data point
    * @return Double prediction from the trained model
    */
-  def predict(testData: Array[Double]): Double = {
-    val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
-    predictPoint(dataMat, weightsMatrix, intercept)
+  def predict(testData: Vector): Double = {
+    predictPoint(testData, weights, intercept)
   }
 }
 
 /**
- * GeneralizedLinearAlgorithm implements methods to train a Genearalized Linear Model (GLM).
+ * GeneralizedLinearAlgorithm implements methods to train a Generalized Linear Model (GLM).
  * This class should be extended with an Optimizer to create a new GLM.
  */
 abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
@@ -88,6 +81,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
 
   val optimizer: Optimizer
 
+  /** Whether to add intercept (default: true). */
   protected var addIntercept: Boolean = true
 
   protected var validateData: Boolean = true
@@ -95,7 +89,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
   /**
    * Create a model given the weights and intercept
    */
-  protected def createModel(weights: Array[Double], intercept: Double): M
+  protected def createModel(weights: Vector, intercept: Double): M
 
   /**
    * Set if the algorithm should add an intercept. Default true.
@@ -117,17 +111,27 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
    * Run the algorithm with the configured parameters on an input
    * RDD of LabeledPoint entries.
    */
-  def run(input: RDD[LabeledPoint]) : M = {
-    val nfeatures: Int = input.first().features.length
-    val initialWeights = new Array[Double](nfeatures)
+  def run(input: RDD[LabeledPoint]): M = {
+    val numFeatures: Int = input.first().features.size
+    val initialWeights = Vectors.dense(new Array[Double](numFeatures))
     run(input, initialWeights)
   }
 
+  /** Prepends one to the input vector. */
+  private def prependOne(vector: Vector): Vector = {
+    val vector1 = vector.toBreeze match {
+      case dv: BDV[Double] => BDV.vertcat(BDV.ones[Double](1), dv)
+      case sv: BSV[Double] => BSV.vertcat(new BSV[Double](Array(0), Array(1.0), 1), sv)
+      case v: Any => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
+    }
+    Vectors.fromBreeze(vector1)
+  }
+
   /**
    * Run the algorithm with the configured parameters on an input RDD
    * of LabeledPoint entries starting from the initial weights provided.
    */
-  def run(input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = {
+  def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
 
     // Check the data properties before running the optimizer
     if (validateData && !validators.forall(func => func(input))) {
@@ -136,27 +140,26 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
 
     // Prepend an extra variable consisting of all 1.0's for the intercept.
     val data = if (addIntercept) {
-      input.map(labeledPoint => (labeledPoint.label, 1.0 +: labeledPoint.features))
+      input.map(labeledPoint => (labeledPoint.label, prependOne(labeledPoint.features)))
     } else {
       input.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
     }
 
     val initialWeightsWithIntercept = if (addIntercept) {
-      0.0 +: initialWeights
+      prependOne(initialWeights)
     } else {
       initialWeights
     }
 
     val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
 
-    val (intercept, weights) = if (addIntercept) {
-      (weightsWithIntercept(0), weightsWithIntercept.tail)
-    } else {
-      (0.0, weightsWithIntercept)
-    }
-
-    logInfo("Final weights " + weights.mkString(","))
-    logInfo("Final intercept " + intercept)
+    val intercept = if (addIntercept) weightsWithIntercept(0) else 0.0
+    val weights =
+      if (addIntercept) {
+        Vectors.dense(weightsWithIntercept.toArray.slice(1, weightsWithIntercept.size))
+      } else {
+        weightsWithIntercept
+      }
 
     createModel(weights, intercept)
   }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
index 1a18292fe3f3bd4b362fcaf74de1c60fe9c6a5bb..3deab1ab785b94dc66ee300458369d7ba26bc8e0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
@@ -17,14 +17,16 @@
 
 package org.apache.spark.mllib.regression
 
+import org.apache.spark.mllib.linalg.Vector
+
 /**
  * Class that represents the features and labels of a data point.
  *
  * @param label Label for this data point.
  * @param features List of features for this data point.
  */
-case class LabeledPoint(label: Double, features: Array[Double]) {
+case class LabeledPoint(label: Double, features: Vector) {
   override def toString: String = {
-    "LabeledPoint(%s, %s)".format(label, features.mkString("[", ", ", "]"))
+    "LabeledPoint(%s, %s)".format(label, features)
   }
 }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
index be63ce8538fefc2689fa7f7a094a28734f25fc89..25920d0dc976e96de3c18b9ddb2c008ed88c595c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.mllib.regression
 
-import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.util.MLUtils
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.rdd.RDD
 
 /**
  * Regression model trained using Lasso.
@@ -31,16 +30,16 @@ import org.jblas.DoubleMatrix
  * @param intercept Intercept computed for this model.
  */
 class LassoModel(
-    override val weights: Array[Double],
+    override val weights: Vector,
     override val intercept: Double)
   extends GeneralizedLinearModel(weights, intercept)
   with RegressionModel with Serializable {
 
-  override def predictPoint(
-      dataMatrix: DoubleMatrix,
-      weightMatrix: DoubleMatrix,
+  override protected def predictPoint(
+      dataMatrix: Vector,
+      weightMatrix: Vector,
       intercept: Double): Double = {
-    dataMatrix.dot(weightMatrix) + intercept
+    weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
   }
 }
 
@@ -57,8 +56,7 @@ class LassoWithSGD private (
     var numIterations: Int,
     var regParam: Double,
     var miniBatchFraction: Double)
-  extends GeneralizedLinearAlgorithm[LassoModel]
-  with Serializable {
+  extends GeneralizedLinearAlgorithm[LassoModel] with Serializable {
 
   val gradient = new LeastSquaresGradient()
   val updater = new L1Updater()
@@ -70,10 +68,6 @@ class LassoWithSGD private (
   // We don't want to penalize the intercept, so set this to false.
   super.setIntercept(false)
 
-  var yMean = 0.0
-  var xColMean: DoubleMatrix = _
-  var xColSd: DoubleMatrix = _
-
   /**
    * Construct a Lasso object with default parameters
    */
@@ -85,36 +79,8 @@ class LassoWithSGD private (
     this
   }
 
-  override def createModel(weights: Array[Double], intercept: Double) = {
-    val weightsMat = new DoubleMatrix(weights.length, 1, weights: _*)
-    val weightsScaled = weightsMat.div(xColSd)
-    val interceptScaled = yMean - weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0)
-
-    new LassoModel(weightsScaled.data, interceptScaled)
-  }
-
-  override def run(
-      input: RDD[LabeledPoint],
-      initialWeights: Array[Double])
-    : LassoModel =
-  {
-    val nfeatures: Int = input.first.features.length
-    val nexamples: Long = input.count()
-
-    // To avoid penalizing the intercept, we center and scale the data.
-    val stats = MLUtils.computeStats(input, nfeatures, nexamples)
-    yMean = stats._1
-    xColMean = stats._2
-    xColSd = stats._3
-
-    val normalizedData = input.map { point =>
-      val yNormalized = point.label - yMean
-      val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*)
-      val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd)
-      LabeledPoint(yNormalized, featuresNormalized.toArray)
-    }
-
-    super.run(normalizedData, initialWeights)
+  override protected def createModel(weights: Vector, intercept: Double) = {
+    new LassoModel(weights, intercept)
   }
 }
 
@@ -144,11 +110,9 @@ object LassoWithSGD {
       stepSize: Double,
       regParam: Double,
       miniBatchFraction: Double,
-      initialWeights: Array[Double])
-    : LassoModel =
-  {
-    new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input,
-        initialWeights)
+      initialWeights: Vector): LassoModel = {
+    new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction)
+      .run(input, initialWeights)
   }
 
   /**
@@ -168,9 +132,7 @@ object LassoWithSGD {
       numIterations: Int,
       stepSize: Double,
       regParam: Double,
-      miniBatchFraction: Double)
-    : LassoModel =
-  {
+      miniBatchFraction: Double): LassoModel = {
     new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
   }
 
@@ -190,9 +152,7 @@ object LassoWithSGD {
       input: RDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
-      regParam: Double)
-    : LassoModel =
-  {
+      regParam: Double): LassoModel = {
     train(input, numIterations, stepSize, regParam, 1.0)
   }
 
@@ -208,9 +168,7 @@ object LassoWithSGD {
    */
   def train(
       input: RDD[LabeledPoint],
-      numIterations: Int)
-    : LassoModel =
-  {
+      numIterations: Int): LassoModel = {
     train(input, numIterations, 1.0, 1.0, 1.0)
   }
 
@@ -222,7 +180,8 @@ object LassoWithSGD {
     val sc = new SparkContext(args(0), "Lasso")
     val data = MLUtils.loadLabeledData(sc, args(1))
     val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
-    println("Weights: " + model.weights.mkString("[", ", ", "]"))
+
+    println("Weights: " + model.weights)
     println("Intercept: " + model.intercept)
 
     sc.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
index f5f15d1a33f4d1ace1f5a47af60f08a2f0c97dbe..9ed927994e7959058e30a757ef309a69d9121290 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
@@ -19,11 +19,10 @@ package org.apache.spark.mllib.regression
 
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.util.MLUtils
 
-import org.jblas.DoubleMatrix
-
 /**
  * Regression model trained using LinearRegression.
  *
@@ -31,15 +30,15 @@ import org.jblas.DoubleMatrix
  * @param intercept Intercept computed for this model.
  */
 class LinearRegressionModel(
-    override val weights: Array[Double],
+    override val weights: Vector,
     override val intercept: Double)
   extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
 
-  override def predictPoint(
-      dataMatrix: DoubleMatrix,
-      weightMatrix: DoubleMatrix,
+  override protected def predictPoint(
+      dataMatrix: Vector,
+      weightMatrix: Vector,
       intercept: Double): Double = {
-    dataMatrix.dot(weightMatrix) + intercept
+    weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
   }
 }
 
@@ -69,7 +68,7 @@ class LinearRegressionWithSGD private (
    */
   def this() = this(1.0, 100, 1.0)
 
-  override def createModel(weights: Array[Double], intercept: Double) = {
+  override protected def createModel(weights: Vector, intercept: Double) = {
     new LinearRegressionModel(weights, intercept)
   }
 }
@@ -98,11 +97,9 @@ object LinearRegressionWithSGD {
       numIterations: Int,
       stepSize: Double,
       miniBatchFraction: Double,
-      initialWeights: Array[Double])
-    : LinearRegressionModel =
-  {
-    new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input,
-      initialWeights)
+      initialWeights: Vector): LinearRegressionModel = {
+    new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
+      .run(input, initialWeights)
   }
 
   /**
@@ -120,9 +117,7 @@ object LinearRegressionWithSGD {
       input: RDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
-      miniBatchFraction: Double)
-    : LinearRegressionModel =
-  {
+      miniBatchFraction: Double): LinearRegressionModel = {
     new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
   }
 
@@ -140,9 +135,7 @@ object LinearRegressionWithSGD {
   def train(
       input: RDD[LabeledPoint],
       numIterations: Int,
-      stepSize: Double)
-    : LinearRegressionModel =
-  {
+      stepSize: Double): LinearRegressionModel = {
     train(input, numIterations, stepSize, 1.0)
   }
 
@@ -158,9 +151,7 @@ object LinearRegressionWithSGD {
    */
   def train(
       input: RDD[LabeledPoint],
-      numIterations: Int)
-    : LinearRegressionModel =
-  {
+      numIterations: Int): LinearRegressionModel = {
     train(input, numIterations, 1.0, 1.0)
   }
 
@@ -172,7 +163,7 @@ object LinearRegressionWithSGD {
     val sc = new SparkContext(args(0), "LinearRegression")
     val data = MLUtils.loadLabeledData(sc, args(1))
     val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
-    println("Weights: " + model.weights.mkString("[", ", ", "]"))
+    println("Weights: " + model.weights)
     println("Intercept: " + model.intercept)
 
     sc.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
index 423afc32d665cf25352a365767ae547512e726a6..5e4b8a345b1c586784b3c85a3130c675cafffc7c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.mllib.regression
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
 
 trait RegressionModel extends Serializable {
   /**
@@ -26,7 +27,7 @@ trait RegressionModel extends Serializable {
    * @param testData RDD representing data points to be predicted
    * @return RDD[Double] where each entry contains the corresponding prediction
    */
-  def predict(testData: RDD[Array[Double]]): RDD[Double]
+  def predict(testData: RDD[Vector]): RDD[Double]
 
   /**
    * Predict values for a single data point using the model trained.
@@ -34,5 +35,5 @@ trait RegressionModel extends Serializable {
    * @param testData array representing a single data point
    * @return Double prediction from the trained model
    */
-  def predict(testData: Array[Double]): Double
+  def predict(testData: Vector): Double
 }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
index feb100f21888ffc14aed0560afca3f50d2da0d7f..1f17d2107f940183a2e7b0ea71d2dcbcafdead32 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
@@ -21,8 +21,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.util.MLUtils
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.mllib.linalg.Vector
 
 /**
  * Regression model trained using RidgeRegression.
@@ -31,16 +30,16 @@ import org.jblas.DoubleMatrix
  * @param intercept Intercept computed for this model.
  */
 class RidgeRegressionModel(
-    override val weights: Array[Double],
+    override val weights: Vector,
     override val intercept: Double)
   extends GeneralizedLinearModel(weights, intercept)
   with RegressionModel with Serializable {
 
-  override def predictPoint(
-      dataMatrix: DoubleMatrix,
-      weightMatrix: DoubleMatrix,
+  override protected def predictPoint(
+      dataMatrix: Vector,
+      weightMatrix: Vector,
       intercept: Double): Double = {
-    dataMatrix.dot(weightMatrix) + intercept
+    weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
   }
 }
 
@@ -57,8 +56,7 @@ class RidgeRegressionWithSGD private (
     var numIterations: Int,
     var regParam: Double,
     var miniBatchFraction: Double)
-    extends GeneralizedLinearAlgorithm[RidgeRegressionModel]
-  with Serializable {
+    extends GeneralizedLinearAlgorithm[RidgeRegressionModel] with Serializable {
 
   val gradient = new LeastSquaresGradient()
   val updater = new SquaredL2Updater()
@@ -71,10 +69,6 @@ class RidgeRegressionWithSGD private (
   // We don't want to penalize the intercept in RidgeRegression, so set this to false.
   super.setIntercept(false)
 
-  var yMean = 0.0
-  var xColMean: DoubleMatrix = _
-  var xColSd: DoubleMatrix = _
-
   /**
    * Construct a RidgeRegression object with default parameters
    */
@@ -86,36 +80,8 @@ class RidgeRegressionWithSGD private (
     this
   }
 
-  override def createModel(weights: Array[Double], intercept: Double) = {
-    val weightsMat = new DoubleMatrix(weights.length, 1, weights: _*)
-    val weightsScaled = weightsMat.div(xColSd)
-    val interceptScaled = yMean - weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0)
-
-    new RidgeRegressionModel(weightsScaled.data, interceptScaled)
-  }
-
-  override def run(
-      input: RDD[LabeledPoint],
-      initialWeights: Array[Double])
-    : RidgeRegressionModel =
-  {
-    val nfeatures: Int = input.first().features.length
-    val nexamples: Long = input.count()
-
-    // To avoid penalizing the intercept, we center and scale the data.
-    val stats = MLUtils.computeStats(input, nfeatures, nexamples)
-    yMean = stats._1
-    xColMean = stats._2
-    xColSd = stats._3
-
-    val normalizedData = input.map { point =>
-      val yNormalized = point.label - yMean
-      val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*)
-      val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd)
-      LabeledPoint(yNormalized, featuresNormalized.toArray)
-    }
-
-    super.run(normalizedData, initialWeights)
+  override protected def createModel(weights: Vector, intercept: Double) = {
+    new RidgeRegressionModel(weights, intercept)
   }
 }
 
@@ -144,9 +110,7 @@ object RidgeRegressionWithSGD {
       stepSize: Double,
       regParam: Double,
       miniBatchFraction: Double,
-      initialWeights: Array[Double])
-    : RidgeRegressionModel =
-  {
+      initialWeights: Vector): RidgeRegressionModel = {
     new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(
       input, initialWeights)
   }
@@ -167,9 +131,7 @@ object RidgeRegressionWithSGD {
       numIterations: Int,
       stepSize: Double,
       regParam: Double,
-      miniBatchFraction: Double)
-    : RidgeRegressionModel =
-  {
+      miniBatchFraction: Double): RidgeRegressionModel = {
     new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
   }
 
@@ -188,9 +150,7 @@ object RidgeRegressionWithSGD {
       input: RDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
-      regParam: Double)
-    : RidgeRegressionModel =
-  {
+      regParam: Double): RidgeRegressionModel = {
     train(input, numIterations, stepSize, regParam, 1.0)
   }
 
@@ -205,23 +165,22 @@ object RidgeRegressionWithSGD {
    */
   def train(
       input: RDD[LabeledPoint],
-      numIterations: Int)
-    : RidgeRegressionModel =
-  {
+      numIterations: Int): RidgeRegressionModel = {
     train(input, numIterations, 1.0, 1.0, 1.0)
   }
 
   def main(args: Array[String]) {
     if (args.length != 5) {
-      println("Usage: RidgeRegression <master> <input_dir> <step_size> <regularization_parameter>" +
-        " <niters>")
+      println("Usage: RidgeRegression <master> <input_dir> <step_size> " +
+        "<regularization_parameter> <niters>")
       System.exit(1)
     }
     val sc = new SparkContext(args(0), "RidgeRegression")
     val data = MLUtils.loadLabeledData(sc, args(1))
     val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble,
         args(3).toDouble)
-    println("Weights: " + model.weights.mkString("[", ", ", "]"))
+
+    println("Weights: " + model.weights)
     println("Intercept: " + model.intercept)
 
     sc.stop()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
index 33205b919db8f5961f1739984403c3f6288043af..dee9594a9dd791c50bc35181790404b809f681d2 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
@@ -30,6 +30,7 @@ import org.apache.spark.mllib.tree.impurity.{Entropy, Gini, Impurity, Variance}
 import org.apache.spark.mllib.tree.model._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.random.XORShiftRandom
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
 
 /**
  * A class that implements a decision tree algorithm for classification and regression. It
@@ -295,7 +296,7 @@ object DecisionTree extends Serializable with Logging {
     val numNodes = scala.math.pow(2, level).toInt
     logDebug("numNodes = " + numNodes)
     // Find the number of features by looking at the first sample.
-    val numFeatures = input.first().features.length
+    val numFeatures = input.first().features.size
     logDebug("numFeatures = " + numFeatures)
     val numBins = bins(0).length
     logDebug("numBins = " + numBins)
@@ -902,7 +903,7 @@ object DecisionTree extends Serializable with Logging {
     val count = input.count()
 
     // Find the number of features by looking at the first sample
-    val numFeatures = input.take(1)(0).features.length
+    val numFeatures = input.take(1)(0).features.size
 
     val maxBins = strategy.maxBins
     val numBins = if (maxBins <= count) maxBins else count.toInt
@@ -1116,7 +1117,7 @@ object DecisionTree extends Serializable with Logging {
     sc.textFile(dir).map { line =>
       val parts = line.trim().split(",")
       val label = parts(0).toDouble
-      val features = parts.slice(1,parts.length).map(_.toDouble)
+      val features = Vectors.dense(parts.slice(1,parts.length).map(_.toDouble))
       LabeledPoint(label, features)
     }
   }
@@ -1127,7 +1128,7 @@ object DecisionTree extends Serializable with Logging {
    */
   private def accuracyScore(model: DecisionTreeModel, data: RDD[LabeledPoint],
       threshold: Double = 0.5): Double = {
-    def predictedValue(features: Array[Double]) = {
+    def predictedValue(features: Vector) = {
       if (model.predict(features) < threshold) 0.0 else 1.0
     }
     val correctCount = data.filter(y => predictedValue(y.features) == y.label).count()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index a8bbf21daec01b065eeb36cdb9c8b79aa2d6a752..a6dca84a2ce094863fe6b03655bdaaf15d8c90ec 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.tree.model
 
 import org.apache.spark.mllib.tree.configuration.Algo._
 import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
 
 /**
  * Model to store the decision tree parameters
@@ -33,7 +34,7 @@ class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable
    * @param features array representing a single data point
    * @return Double prediction from the trained model
    */
-  def predict(features: Array[Double]): Double = {
+  def predict(features: Vector): Double = {
     topNode.predictIfLeaf(features)
   }
 
@@ -43,7 +44,7 @@ class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable
    * @param features RDD representing data points to be predicted
    * @return RDD[Int] where each entry contains the corresponding prediction
    */
-  def predict(features: RDD[Array[Double]]): RDD[Double] = {
+  def predict(features: RDD[Vector]): RDD[Double] = {
     features.map(x => predict(x))
   }
 }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
index ea4693c5c2f4ee1f4ec21df7c5231466c9f092fd..aac3f9ce308f7a19054c19a01649d141b382bf0f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.tree.model
 
 import org.apache.spark.Logging
 import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.mllib.linalg.Vector
 
 /**
  * Node in a decision tree
@@ -54,8 +55,8 @@ class Node (
     logDebug("stats = " + stats)
     logDebug("predict = " + predict)
     if (!isLeaf) {
-      val leftNodeIndex = id*2 + 1
-      val rightNodeIndex = id*2 + 2
+      val leftNodeIndex = id * 2 + 1
+      val rightNodeIndex = id * 2 + 2
       leftNode = Some(nodes(leftNodeIndex))
       rightNode = Some(nodes(rightNodeIndex))
       leftNode.get.build(nodes)
@@ -68,7 +69,7 @@ class Node (
    * @param feature feature value
    * @return predicted value
    */
-  def predictIfLeaf(feature: Array[Double]) : Double = {
+  def predictIfLeaf(feature: Vector) : Double = {
     if (isLeaf) {
       predict
     } else{
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
index 2e03684e6286128546d731cb23773b860df5559f..81e4eda2a68c485c4a746c3d400fb8cb86927993 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
@@ -24,6 +24,7 @@ import org.jblas.DoubleMatrix
 
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.LabeledPoint
 
 /**
@@ -74,7 +75,7 @@ object LinearDataGenerator {
     val y = x.map { xi =>
       new DoubleMatrix(1, xi.length, xi: _*).dot(weightsMat) + intercept + eps * rnd.nextGaussian()
     }
-    y.zip(x).map(p => LabeledPoint(p._1, p._2))
+    y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2)))
   }
 
   /**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
index 52c4a71d621a11d73749279d2b9767a9a68b914b..61498dcc2be000d5b493168c5d52b7130cd37351 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
@@ -22,6 +22,7 @@ import scala.util.Random
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.linalg.Vectors
 
 /**
  * Generate test data for LogisticRegression. This class chooses positive labels
@@ -54,7 +55,7 @@ object LogisticRegressionDataGenerator {
       val x = Array.fill[Double](nfeatures) {
         rnd.nextGaussian() + (y * eps)
       }
-      LabeledPoint(y, x)
+      LabeledPoint(y, Vectors.dense(x))
     }
     data
   }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index 08cd9ab05547ba482748145aae8a85bbf134048d..cb85e433bfc7344006b562b0264632a2f23e4022 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -17,15 +17,13 @@
 
 package org.apache.spark.mllib.util
 
+import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV,
+  squaredDistance => breezeSquaredDistance}
+
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkContext._
-
-import org.jblas.DoubleMatrix
-
 import org.apache.spark.mllib.regression.LabeledPoint
-
-import breeze.linalg.{Vector => BV, SparseVector => BSV, squaredDistance => breezeSquaredDistance}
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
 
 /**
  * Helper methods to load, save and pre-process data used in ML Lib.
@@ -40,6 +38,107 @@ object MLUtils {
     eps
   }
 
+  /**
+   * Multiclass label parser, which parses a string into double.
+   */
+  val multiclassLabelParser: String => Double = _.toDouble
+
+  /**
+   * Binary label parser, which outputs 1.0 (positive) if the value is greater than 0.5,
+   * or 0.0 (negative) otherwise.
+   */
+  val binaryLabelParser: String => Double = label => if (label.toDouble > 0.5) 1.0 else 0.0
+
+  /**
+   * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint].
+   * The LIBSVM format is a text-based format used by LIBSVM and LIBLINEAR.
+   * Each line represents a labeled sparse feature vector using the following format:
+   * {{{label index1:value1 index2:value2 ...}}}
+   * where the indices are one-based and in ascending order.
+   * This method parses each line into a [[org.apache.spark.mllib.regression.LabeledPoint]],
+   * where the feature indices are converted to zero-based.
+   *
+   * @param sc Spark context
+   * @param path file or directory path in any Hadoop-supported file system URI
+   * @param labelParser parser for labels, default: 1.0 if label > 0.5 or 0.0 otherwise
+   * @param numFeatures number of features, which will be determined from the input data if a
+   *                    negative value is given. The default value is -1.
+   * @param minSplits min number of partitions, default: sc.defaultMinSplits
+   * @return labeled data stored as an RDD[LabeledPoint]
+   */
+  def loadLibSVMData(
+      sc: SparkContext,
+      path: String,
+      labelParser: String => Double,
+      numFeatures: Int,
+      minSplits: Int): RDD[LabeledPoint] = {
+    val parsed = sc.textFile(path, minSplits)
+      .map(_.trim)
+      .filter(!_.isEmpty)
+      .map(_.split(' '))
+    // Determine number of features.
+    val d = if (numFeatures >= 0) {
+      numFeatures
+    } else {
+      parsed.map { items =>
+        if (items.length > 1) {
+          items.last.split(':')(0).toInt
+        } else {
+          0
+        }
+      }.reduce(math.max)
+    }
+    parsed.map { items =>
+      val label = labelParser(items.head)
+      val (indices, values) = items.tail.map { item =>
+        val indexAndValue = item.split(':')
+        val index = indexAndValue(0).toInt - 1
+        val value = indexAndValue(1).toDouble
+        (index, value)
+      }.unzip
+      LabeledPoint(label, Vectors.sparse(d, indices.toArray, values.toArray))
+    }
+  }
+
+  // Convenient methods for calling from Java.
+
+  /**
+   * Loads binary labeled data in the LIBSVM format into an RDD[LabeledPoint],
+   * with number of features determined automatically and the default number of partitions.
+   */
+  def loadLibSVMData(sc: SparkContext, path: String): RDD[LabeledPoint] =
+    loadLibSVMData(sc, path, binaryLabelParser, -1, sc.defaultMinSplits)
+
+  /**
+   * Loads binary labeled data in the LIBSVM format into an RDD[LabeledPoint],
+   * with number of features specified explicitly and the default number of partitions.
+   */
+  def loadLibSVMData(sc: SparkContext, path: String, numFeatures: Int): RDD[LabeledPoint] =
+    loadLibSVMData(sc, path, binaryLabelParser, numFeatures, sc.defaultMinSplits)
+
+  /**
+   * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
+   * with the given label parser, number of features determined automatically,
+   * and the default number of partitions.
+   */
+  def loadLibSVMData(
+      sc: SparkContext,
+      path: String,
+      labelParser: String => Double): RDD[LabeledPoint] =
+    loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinSplits)
+
+  /**
+   * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
+   * with the given label parser, number of features specified explicitly,
+   * and the default number of partitions.
+   */
+  def loadLibSVMData(
+      sc: SparkContext,
+      path: String,
+      labelParser: String => Double,
+      numFeatures: Int): RDD[LabeledPoint] =
+    loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinSplits)
+
   /**
    * Load labeled data from a file. The data format used here is
    * <L>, <f1> <f2> ...
@@ -54,7 +153,7 @@ object MLUtils {
     sc.textFile(dir).map { line =>
       val parts = line.split(',')
       val label = parts(0).toDouble
-      val features = parts(1).trim().split(' ').map(_.toDouble)
+      val features = Vectors.dense(parts(1).trim().split(' ').map(_.toDouble))
       LabeledPoint(label, features)
     }
   }
@@ -68,7 +167,7 @@ object MLUtils {
    * @param dir Directory to save the data.
    */
   def saveLabeledData(data: RDD[LabeledPoint], dir: String) {
-    val dataStr = data.map(x => x.label + "," + x.features.mkString(" "))
+    val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" "))
     dataStr.saveAsTextFile(dir)
   }
 
@@ -76,44 +175,52 @@ object MLUtils {
    * Utility function to compute mean and standard deviation on a given dataset.
    *
    * @param data - input data set whose statistics are computed
-   * @param nfeatures - number of features
-   * @param nexamples - number of examples in input dataset
+   * @param numFeatures - number of features
+   * @param numExamples - number of examples in input dataset
    *
    * @return (yMean, xColMean, xColSd) - Tuple consisting of
    *     yMean - mean of the labels
    *     xColMean - Row vector with mean for every column (or feature) of the input data
    *     xColSd - Row vector standard deviation for every column (or feature) of the input data.
    */
-  def computeStats(data: RDD[LabeledPoint], nfeatures: Int, nexamples: Long):
-      (Double, DoubleMatrix, DoubleMatrix) = {
-    val yMean: Double = data.map { labeledPoint => labeledPoint.label }.reduce(_ + _) / nexamples
-
-    // NOTE: We shuffle X by column here to compute column sum and sum of squares.
-    val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { labeledPoint =>
-      val nCols = labeledPoint.features.length
-      // Traverse over every column and emit (col, value, value^2)
-      Iterator.tabulate(nCols) { i =>
-        (i, (labeledPoint.features(i), labeledPoint.features(i)*labeledPoint.features(i)))
-      }
-    }.reduceByKey { case(x1, x2) =>
-      (x1._1 + x2._1, x1._2 + x2._2)
+  def computeStats(
+      data: RDD[LabeledPoint],
+      numFeatures: Int,
+      numExamples: Long): (Double, Vector, Vector) = {
+    val brzData = data.map { case LabeledPoint(label, features) =>
+      (label, features.toBreeze)
     }
-    val xColSumsMap = xColSumSq.collectAsMap()
-
-    val xColMean = DoubleMatrix.zeros(nfeatures, 1)
-    val xColSd = DoubleMatrix.zeros(nfeatures, 1)
-
-    // Compute mean and unbiased variance using column sums
-    var col = 0
-    while (col < nfeatures) {
-      xColMean.put(col, xColSumsMap(col)._1 / nexamples)
-      val variance =
-        (xColSumsMap(col)._2 - (math.pow(xColSumsMap(col)._1, 2) / nexamples)) / nexamples
-      xColSd.put(col, math.sqrt(variance))
-      col += 1
+    val aggStats = brzData.aggregate(
+      (0L, 0.0, BDV.zeros[Double](numFeatures), BDV.zeros[Double](numFeatures))
+    )(
+      seqOp = (c, v) => (c, v) match {
+        case ((n, sumLabel, sum, sumSq), (label, features)) =>
+          features.activeIterator.foreach { case (i, x) =>
+            sumSq(i) += x * x
+          }
+          (n + 1L, sumLabel + label, sum += features, sumSq)
+      },
+      combOp = (c1, c2) => (c1, c2) match {
+        case ((n1, sumLabel1, sum1, sumSq1), (n2, sumLabel2, sum2, sumSq2)) =>
+          (n1 + n2, sumLabel1 + sumLabel2, sum1 += sum2, sumSq1 += sumSq2)
+      }
+    )
+    val (nl, sumLabel, sum, sumSq) = aggStats
+
+    require(nl > 0, "Input data is empty.")
+    require(nl == numExamples)
+
+    val n = nl.toDouble
+    val yMean = sumLabel / n
+    val mean = sum / n
+    val std = new Array[Double](sum.length)
+    var i = 0
+    while (i < numFeatures) {
+      std(i) = sumSq(i) / n - mean(i) * mean(i)
+      i += 1
     }
 
-    (yMean, xColMean, xColSd)
+    (yMean, Vectors.fromBreeze(mean), Vectors.dense(std))
   }
 
   /**
@@ -144,6 +251,18 @@ object MLUtils {
     val sumSquaredNorm = norm1 * norm1 + norm2 * norm2
     val normDiff = norm1 - norm2
     var sqDist = 0.0
+    /*
+     * The relative error is
+     * <pre>
+     * EPSILON * ( \|a\|_2^2 + \|b\\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),
+     * </pre>
+     * which is bounded by
+     * <pre>
+     * 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).
+     * </pre>
+     * The bound doesn't need the inner product, so we can use it as a sufficient condition to
+     * check quickly whether the inner product approach is accurate.
+     */
     val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)
     if (precisionBound1 < precision) {
       sqDist = sumSquaredNorm - 2.0 * v1.dot(v2)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
index c96c94f70eef73bd8eb82d83e71c1e100309fb68..e300c3dbe1fe02ef529e9adaed072452a5ce53f0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
@@ -23,6 +23,7 @@ import org.jblas.DoubleMatrix
 
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.LabeledPoint
 
 /**
@@ -58,7 +59,7 @@ object SVMDataGenerator {
       }
       val yD = new DoubleMatrix(1, x.length, x: _*).dot(trueWeights) + rnd.nextGaussian() * 0.1
       val y = if (yD < 0) 0.0 else 1.0
-      LabeledPoint(y, x)
+      LabeledPoint(y, Vectors.dense(x))
     }
 
     MLUtils.saveLabeledData(data, outputPath)
diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
index 073ded6f36933f8aea1a08d3eaaacf8cc9c28de7..c80b1134ed1b28273338271893974d08563ad835 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.classification;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.linalg.Vectors;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.junit.After;
 import org.junit.Assert;
@@ -45,12 +46,12 @@ public class JavaNaiveBayesSuite implements Serializable {
   }
 
   private static final List<LabeledPoint> POINTS = Arrays.asList(
-    new LabeledPoint(0, new double[] {1.0, 0.0, 0.0}),
-    new LabeledPoint(0, new double[] {2.0, 0.0, 0.0}),
-    new LabeledPoint(1, new double[] {0.0, 1.0, 0.0}),
-    new LabeledPoint(1, new double[] {0.0, 2.0, 0.0}),
-    new LabeledPoint(2, new double[] {0.0, 0.0, 1.0}),
-    new LabeledPoint(2, new double[] {0.0, 0.0, 2.0})
+    new LabeledPoint(0, Vectors.dense(1.0, 0.0, 0.0)),
+    new LabeledPoint(0, Vectors.dense(2.0, 0.0, 0.0)),
+    new LabeledPoint(1, Vectors.dense(0.0, 1.0, 0.0)),
+    new LabeledPoint(1, Vectors.dense(0.0, 2.0, 0.0)),
+    new LabeledPoint(2, Vectors.dense(0.0, 0.0, 1.0)),
+    new LabeledPoint(2, Vectors.dense(0.0, 0.0, 2.0))
   );
 
   private int validatePrediction(List<LabeledPoint> points, NaiveBayesModel model) {
diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
index 117e5eaa8b78e42294738f9faca2b0c6c5cd36f4..4701a5e545020851313d2204343656b74faa523c 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.mllib.classification;
 
-
 import java.io.Serializable;
 import java.util.List;
 
@@ -28,7 +27,6 @@ import org.junit.Test;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-
 import org.apache.spark.mllib.regression.LabeledPoint;
 
 public class JavaSVMSuite implements Serializable {
@@ -94,5 +92,4 @@ public class JavaSVMSuite implements Serializable {
     int numAccurate = validatePrediction(validationData, model);
     Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
   }
-
 }
diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
index 2c4d795f96e4e8fafa0375490000bf8edf2ced56..c6d8425ffc38dbb23765b53756941b88bc5fc4b9 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
@@ -19,10 +19,10 @@ package org.apache.spark.mllib.linalg;
 
 import java.io.Serializable;
 
-import com.google.common.collect.Lists;
-
 import scala.Tuple2;
 
+import com.google.common.collect.Lists;
+
 import org.junit.Test;
 import static org.junit.Assert.*;
 
@@ -36,7 +36,7 @@ public class JavaVectorsSuite implements Serializable {
 
   @Test
   public void sparseArrayConstruction() {
-    Vector v = Vectors.sparse(3, Lists.newArrayList(
+    Vector v = Vectors.sparse(3, Lists.<Tuple2<Integer, Double>>newArrayList(
         new Tuple2<Integer, Double>(0, 2.0),
         new Tuple2<Integer, Double>(2, 3.0)));
     assertArrayEquals(new double[]{2.0, 0.0, 3.0}, v.toArray(), 0.0);
diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
index f44b25cd44d19be342ac6ee0ddfcb597fe515ab1..f725924a2d971aefd68f3cd2863c61572d5a6ae2 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
@@ -59,7 +59,7 @@ public class JavaLassoSuite implements Serializable {
   @Test
   public void runLassoUsingConstructor() {
     int nPoints = 10000;
-    double A = 2.0;
+    double A = 0.0;
     double[] weights = {-1.5, 1.0e-2};
 
     JavaRDD<LabeledPoint> testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A,
@@ -80,7 +80,7 @@ public class JavaLassoSuite implements Serializable {
   @Test
   public void runLassoUsingStaticMethods() {
     int nPoints = 10000;
-    double A = 2.0;
+    double A = 0.0;
     double[] weights = {-1.5, 1.0e-2};
 
     JavaRDD<LabeledPoint> testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A,
diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
index 2fdd5fc8fdca6371b0a85351451a306bbd2fc459..03714ae7e4d0008613898318054aafe8b6aab41e 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
@@ -55,30 +55,27 @@ public class JavaRidgeRegressionSuite implements Serializable {
     return errorSum / validationData.size();
   }
 
-  List<LabeledPoint> generateRidgeData(int numPoints, int nfeatures, double eps) {
+  List<LabeledPoint> generateRidgeData(int numPoints, int numFeatures, double std) {
     org.jblas.util.Random.seed(42);
     // Pick weights as random values distributed uniformly in [-0.5, 0.5]
-    DoubleMatrix w = DoubleMatrix.rand(nfeatures, 1).subi(0.5);
-    // Set first two weights to eps
-    w.put(0, 0, eps);
-    w.put(1, 0, eps);
-    return LinearDataGenerator.generateLinearInputAsList(0.0, w.data, numPoints, 42, eps);
+    DoubleMatrix w = DoubleMatrix.rand(numFeatures, 1).subi(0.5);
+    return LinearDataGenerator.generateLinearInputAsList(0.0, w.data, numPoints, 42, std);
   }
 
   @Test
   public void runRidgeRegressionUsingConstructor() {
-    int nexamples = 200;
-    int nfeatures = 20;
-    double eps = 10.0;
-    List<LabeledPoint> data = generateRidgeData(2*nexamples, nfeatures, eps);
+    int numExamples = 50;
+    int numFeatures = 20;
+    List<LabeledPoint> data = generateRidgeData(2*numExamples, numFeatures, 10.0);
 
-    JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, nexamples));
-    List<LabeledPoint> validationData = data.subList(nexamples, 2*nexamples);
+    JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, numExamples));
+    List<LabeledPoint> validationData = data.subList(numExamples, 2 * numExamples);
 
     RidgeRegressionWithSGD ridgeSGDImpl = new RidgeRegressionWithSGD();
-    ridgeSGDImpl.optimizer().setStepSize(1.0)
-                            .setRegParam(0.0)
-                            .setNumIterations(200);
+    ridgeSGDImpl.optimizer()
+      .setStepSize(1.0)
+      .setRegParam(0.0)
+      .setNumIterations(200);
     RidgeRegressionModel model = ridgeSGDImpl.run(testRDD.rdd());
     double unRegularizedErr = predictionError(validationData, model);
 
@@ -91,13 +88,12 @@ public class JavaRidgeRegressionSuite implements Serializable {
 
   @Test
   public void runRidgeRegressionUsingStaticMethods() {
-    int nexamples = 200;
-    int nfeatures = 20;
-    double eps = 10.0;
-    List<LabeledPoint> data = generateRidgeData(2*nexamples, nfeatures, eps);
+    int numExamples = 50;
+    int numFeatures = 20;
+    List<LabeledPoint> data = generateRidgeData(2 * numExamples, numFeatures, 10.0);
 
-    JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, nexamples));
-    List<LabeledPoint> validationData = data.subList(nexamples, 2*nexamples);
+    JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, numExamples));
+    List<LabeledPoint> validationData = data.subList(numExamples, 2 * numExamples);
 
     RidgeRegressionModel model = RidgeRegressionWithSGD.train(testRDD.rdd(), 200, 1.0, 0.0);
     double unRegularizedErr = predictionError(validationData, model);
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
index 05322b024d5f6d4e97e000543a6f28b5bcefc2f5..1e03c9df820b0e0eb9c1c5ad4cf1d8b7160885ff 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -20,11 +20,10 @@ package org.apache.spark.mllib.classification
 import scala.util.Random
 import scala.collection.JavaConversions._
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression._
 import org.apache.spark.mllib.util.LocalSparkContext
 
@@ -61,7 +60,7 @@ object LogisticRegressionSuite {
       if (yVal > 0) 1 else 0
     }
 
-    val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i))))
+    val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(Array(x1(i)))))
     testData
   }
 
@@ -113,7 +112,7 @@ class LogisticRegressionSuite extends FunSuite with LocalSparkContext with Shoul
     val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42)
 
     val initialB = -1.0
-    val initialWeights = Array(initialB)
+    val initialWeights = Vectors.dense(initialB)
 
     val testRDD = sc.parallelize(testData, 2)
     testRDD.cache()
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
index 9dd6c79ee6ad8cc50a7ae202854995daf0452078..516895d04222d7a1e2d2df224f6a287ecf594bc2 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
@@ -19,9 +19,9 @@ package org.apache.spark.mllib.classification
 
 import scala.util.Random
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.util.LocalSparkContext
 
@@ -54,7 +54,7 @@ object NaiveBayesSuite {
         if (rnd.nextDouble() < _theta(y)(j)) 1 else 0
       }
 
-      LabeledPoint(y, xi)
+      LabeledPoint(y, Vectors.dense(xi))
     }
   }
 }
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
index bc7abb568a172fc5daec6660d8d601e5a280d90f..dfacbfeee6fb4d1823b981abc3dca3d4726cb779 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.mllib.classification
 import scala.util.Random
 import scala.collection.JavaConversions._
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 
 import org.jblas.DoubleMatrix
@@ -28,6 +27,7 @@ import org.jblas.DoubleMatrix
 import org.apache.spark.SparkException
 import org.apache.spark.mllib.regression._
 import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.Vectors
 
 object SVMSuite {
 
@@ -54,7 +54,7 @@ object SVMSuite {
         intercept + 0.01 * rnd.nextGaussian()
       if (yD < 0) 0.0 else 1.0
     }
-    y.zip(x).map(p => LabeledPoint(p._1, p._2))
+    y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2)))
   }
 
 }
@@ -110,7 +110,7 @@ class SVMSuite extends FunSuite with LocalSparkContext {
 
     val initialB = -1.0
     val initialC = -1.0
-    val initialWeights = Array(initialB,initialC)
+    val initialWeights = Vectors.dense(initialB, initialC)
 
     val testRDD = sc.parallelize(testData, 2)
     testRDD.cache()
@@ -150,10 +150,10 @@ class SVMSuite extends FunSuite with LocalSparkContext {
     }
 
     intercept[SparkException] {
-      val model = SVMWithSGD.train(testRDDInvalid, 100)
+      SVMWithSGD.train(testRDDInvalid, 100)
     }
 
     // Turning off data validation should not throw an exception
-    val noValidationModel = new SVMWithSGD().setValidateData(false).run(testRDDInvalid)
+    new SVMWithSGD().setValidateData(false).run(testRDDInvalid)
   }
 }
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
index 631d0e2ad9cdb3696f9d38b124dd963fd5e6bfab..c4b433499a091ef157c7f8bcd50b3ab35d8182d9 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
@@ -20,13 +20,12 @@ package org.apache.spark.mllib.optimization
 import scala.util.Random
 import scala.collection.JavaConversions._
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.SparkContext
 import org.apache.spark.mllib.regression._
 import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.Vectors
 
 object GradientDescentSuite {
 
@@ -58,8 +57,7 @@ object GradientDescentSuite {
       if (yVal > 0) 1 else 0
     }
 
-    val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i))))
-    testData
+    (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(x1(i))))
   }
 }
 
@@ -83,11 +81,11 @@ class GradientDescentSuite extends FunSuite with LocalSparkContext with ShouldMa
     // Add a extra variable consisting of all 1.0's for the intercept.
     val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42)
     val data = testData.map { case LabeledPoint(label, features) =>
-      label -> Array(1.0, features: _*)
+      label -> Vectors.dense(1.0, features.toArray: _*)
     }
 
     val dataRDD = sc.parallelize(data, 2).cache()
-    val initialWeightsWithIntercept = Array(1.0, initialWeights: _*)
+    val initialWeightsWithIntercept = Vectors.dense(1.0, initialWeights: _*)
 
     val (_, loss) = GradientDescent.runMiniBatchSGD(
       dataRDD,
@@ -113,13 +111,13 @@ class GradientDescentSuite extends FunSuite with LocalSparkContext with ShouldMa
     // Add a extra variable consisting of all 1.0's for the intercept.
     val testData = GradientDescentSuite.generateGDInput(2.0, -1.5, 10000, 42)
     val data = testData.map { case LabeledPoint(label, features) =>
-      label -> Array(1.0, features: _*)
+      label -> Vectors.dense(1.0, features.toArray: _*)
     }
 
     val dataRDD = sc.parallelize(data, 2).cache()
 
     // Prepare non-zero weights
-    val initialWeightsWithIntercept = Array(1.0, 0.5)
+    val initialWeightsWithIntercept = Vectors.dense(1.0, 0.5)
 
     val regParam0 = 0
     val (newWeights0, loss0) = GradientDescent.runMiniBatchSGD(
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
index 2cebac943e15f66643a6c57ee4b2934051f71eeb..6aad9eb84e13ccdff0a903fa7b707e14fa699295 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.regression
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
 
 class LassoSuite extends FunSuite with LocalSparkContext {
@@ -33,29 +34,33 @@ class LassoSuite extends FunSuite with LocalSparkContext {
   }
 
   test("Lasso local random SGD") {
-    val nPoints = 10000
+    val nPoints = 1000
 
     val A = 2.0
     val B = -1.5
     val C = 1.0e-2
 
-    val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42)
-
-    val testRDD = sc.parallelize(testData, 2)
-    testRDD.cache()
+    val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B, C), nPoints, 42)
+      .map { case LabeledPoint(label, features) =>
+      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+    }
+    val testRDD = sc.parallelize(testData, 2).cache()
 
     val ls = new LassoWithSGD()
-    ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+    ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40)
 
     val model = ls.run(testRDD)
-
     val weight0 = model.weights(0)
     val weight1 = model.weights(1)
-    assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
-    assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
-    assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
+    val weight2 = model.weights(2)
+    assert(weight0 >= 1.9 && weight0 <= 2.1, weight0 + " not in [1.9, 2.1]")
+    assert(weight1 >= -1.60 && weight1 <= -1.40, weight1 + " not in [-1.6, -1.4]")
+    assert(weight2 >= -1.0e-3 && weight2 <= 1.0e-3, weight2 + " not in [-0.001, 0.001]")
 
     val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17)
+      .map { case LabeledPoint(label, features) =>
+      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+    }
     val validationRDD  = sc.parallelize(validationData, 2)
 
     // Test prediction on RDD.
@@ -66,33 +71,39 @@ class LassoSuite extends FunSuite with LocalSparkContext {
   }
 
   test("Lasso local random SGD with initial weights") {
-    val nPoints = 10000
+    val nPoints = 1000
 
     val A = 2.0
     val B = -1.5
     val C = 1.0e-2
 
-    val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42)
+    val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B, C), nPoints, 42)
+      .map { case LabeledPoint(label, features) =>
+      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+    }
 
+    val initialA = -1.0
     val initialB = -1.0
     val initialC = -1.0
-    val initialWeights = Array(initialB,initialC)
+    val initialWeights = Vectors.dense(initialA, initialB, initialC)
 
-    val testRDD = sc.parallelize(testData, 2)
-    testRDD.cache()
+    val testRDD = sc.parallelize(testData, 2).cache()
 
     val ls = new LassoWithSGD()
-    ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+    ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40)
 
     val model = ls.run(testRDD, initialWeights)
-
     val weight0 = model.weights(0)
     val weight1 = model.weights(1)
-    assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
-    assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
-    assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
+    val weight2 = model.weights(2)
+    assert(weight0 >= 1.9 && weight0 <= 2.1, weight0 + " not in [1.9, 2.1]")
+    assert(weight1 >= -1.60 && weight1 <= -1.40, weight1 + " not in [-1.6, -1.4]")
+    assert(weight2 >= -1.0e-3 && weight2 <= 1.0e-3, weight2 + " not in [-0.001, 0.001]")
 
     val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17)
+      .map { case LabeledPoint(label, features) =>
+      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+    }
     val validationRDD  = sc.parallelize(validationData,2)
 
     // Test prediction on RDD.
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
index 5d251bcbf35dbc2c6334fa2ba7ad5992afb589ca..2f7d30708ce172d5a7be9e00e294f9acc80aba6c 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.regression
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
 
 class LinearRegressionSuite extends FunSuite with LocalSparkContext {
@@ -40,11 +41,12 @@ class LinearRegressionSuite extends FunSuite with LocalSparkContext {
     linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
 
     val model = linReg.run(testRDD)
-
     assert(model.intercept >= 2.5 && model.intercept <= 3.5)
-    assert(model.weights.length === 2)
-    assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0)
-    assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0)
+
+    val weights = model.weights
+    assert(weights.size === 2)
+    assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+    assert(weights(1) >= 9.0 && weights(1) <= 11.0)
 
     val validationData = LinearDataGenerator.generateLinearInput(
       3.0, Array(10.0, 10.0), 100, 17)
@@ -67,9 +69,11 @@ class LinearRegressionSuite extends FunSuite with LocalSparkContext {
     val model = linReg.run(testRDD)
 
     assert(model.intercept === 0.0)
-    assert(model.weights.length === 2)
-    assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0)
-    assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0)
+
+    val weights = model.weights
+    assert(weights.size === 2)
+    assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+    assert(weights(1) >= 9.0 && weights(1) <= 11.0)
 
     val validationData = LinearDataGenerator.generateLinearInput(
       0.0, Array(10.0, 10.0), 100, 17)
@@ -81,4 +85,40 @@ class LinearRegressionSuite extends FunSuite with LocalSparkContext {
     // Test prediction on Array.
     validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
   }
+
+  // Test if we can correctly learn Y = 10*X1 + 10*X10000
+  test("sparse linear regression without intercept") {
+    val denseRDD = sc.parallelize(
+      LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42), 2)
+    val sparseRDD = denseRDD.map { case LabeledPoint(label, v) =>
+      val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1))))
+      LabeledPoint(label, sv)
+    }.cache()
+    val linReg = new LinearRegressionWithSGD().setIntercept(false)
+    linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
+
+    val model = linReg.run(sparseRDD)
+
+    assert(model.intercept === 0.0)
+
+    val weights = model.weights
+    assert(weights.size === 10000)
+    assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+    assert(weights(9999) >= 9.0 && weights(9999) <= 11.0)
+
+    val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17)
+    val sparseValidationData = validationData.map { case LabeledPoint(label, v) =>
+      val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1))))
+      LabeledPoint(label, sv)
+    }
+    val sparseValidationRDD = sc.parallelize(sparseValidationData, 2)
+
+      // Test prediction on RDD.
+    validatePrediction(
+      model.predict(sparseValidationRDD.map(_.features)).collect(), sparseValidationData)
+
+    // Test prediction on Array.
+    validatePrediction(
+      sparseValidationData.map(row => model.predict(row.features)), sparseValidationData)
+  }
 }
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
index b2044ed0d8066793c5353a20627785cdf0ed7a55..f66fc6ea6c1ec6505b15aaa2c309990330245a6e 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.mllib.regression
 
-import org.jblas.DoubleMatrix
 import org.scalatest.FunSuite
 
+import org.jblas.DoubleMatrix
+
 import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
 
 class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
@@ -30,22 +31,22 @@ class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
     }.reduceLeft(_ + _) / predictions.size
   }
 
-  test("regularization with skewed weights") {
-    val nexamples = 200
-    val nfeatures = 20
-    val eps = 10
+  test("ridge regression can help avoid overfitting") {
+
+    // For small number of examples and large variance of error distribution,
+    // ridge regression should give smaller generalization error that linear regression.
+
+    val numExamples = 50
+    val numFeatures = 20
 
     org.jblas.util.Random.seed(42)
     // Pick weights as random values distributed uniformly in [-0.5, 0.5]
-    val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
-    // Set first two weights to eps
-    w.put(0, 0, eps)
-    w.put(1, 0, eps)
+    val w = DoubleMatrix.rand(numFeatures, 1).subi(0.5)
 
     // Use half of data for training and other half for validation
-    val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 2*nexamples, 42, eps)
-    val testData = data.take(nexamples)
-    val validationData = data.takeRight(nexamples)
+    val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 2 * numExamples, 42, 10.0)
+    val testData = data.take(numExamples)
+    val validationData = data.takeRight(numExamples)
 
     val testRDD = sc.parallelize(testData, 2).cache()
     val validationRDD = sc.parallelize(validationData, 2).cache()
@@ -67,7 +68,7 @@ class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
     val ridgeErr = predictionError(
         ridgeModel.predict(validationRDD.map(_.features)).collect(), validationData)
 
-    // Ridge CV-error should be lower than linear regression
+    // Ridge validation error should be lower than linear regression.
     assert(ridgeErr < linearErr,
       "ridgeError (" + ridgeErr + ") was not less than linearError(" + linearErr + ")")
   }
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
index 4349c7000a0ae100b9e0de30a02a30e14ed8d07d..350130c914f26c3be49abca37f86a224aff6ec83 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.mllib.tree.model.Filter
 import org.apache.spark.mllib.tree.configuration.Strategy
 import org.apache.spark.mllib.tree.configuration.Algo._
 import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.mllib.linalg.Vectors
 
 class DecisionTreeSuite extends FunSuite with BeforeAndAfterAll {
 
@@ -396,7 +397,7 @@ object DecisionTreeSuite {
   def generateOrderedLabeledPointsWithLabel0(): Array[LabeledPoint] = {
     val arr = new Array[LabeledPoint](1000)
     for (i <- 0 until 1000){
-      val lp = new LabeledPoint(0.0,Array(i.toDouble,1000.0-i))
+      val lp = new LabeledPoint(0.0, Vectors.dense(i.toDouble, 1000.0 - i))
       arr(i) = lp
     }
     arr
@@ -405,7 +406,7 @@ object DecisionTreeSuite {
   def generateOrderedLabeledPointsWithLabel1(): Array[LabeledPoint] = {
     val arr = new Array[LabeledPoint](1000)
     for (i <- 0 until 1000){
-      val lp = new LabeledPoint(1.0,Array(i.toDouble,999.0-i))
+      val lp = new LabeledPoint(1.0, Vectors.dense(i.toDouble, 999.0 - i))
       arr(i) = lp
     }
     arr
@@ -415,9 +416,9 @@ object DecisionTreeSuite {
     val arr = new Array[LabeledPoint](1000)
     for (i <- 0 until 1000){
       if (i < 600){
-        arr(i) = new LabeledPoint(1.0,Array(0.0,1.0))
+        arr(i) = new LabeledPoint(1.0, Vectors.dense(0.0, 1.0))
       } else {
-        arr(i) = new LabeledPoint(0.0,Array(1.0,0.0))
+        arr(i) = new LabeledPoint(0.0, Vectors.dense(1.0, 0.0))
       }
     }
     arr
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index 60f053b3813051c6185c3262b407567d03d6dd0e..27d41c7869aa09d2630e7992400f67fee777bfe8 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -17,14 +17,20 @@
 
 package org.apache.spark.mllib.util
 
+import java.io.File
+
 import org.scalatest.FunSuite
 
 import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => breezeNorm,
   squaredDistance => breezeSquaredDistance}
+import com.google.common.base.Charsets
+import com.google.common.io.Files
 
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.util.MLUtils._
 
-class MLUtilsSuite extends FunSuite {
+class MLUtilsSuite extends FunSuite with LocalSparkContext {
 
   test("epsilon computation") {
     assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.")
@@ -49,4 +55,55 @@ class MLUtilsSuite extends FunSuite {
       assert((fastSquaredDist2 - squaredDist) <= precision * squaredDist, s"failed with m = $m")
     }
   }
+
+  test("compute stats") {
+    val data = Seq.fill(3)(Seq(
+      LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 3.0)),
+      LabeledPoint(0.0, Vectors.dense(3.0, 4.0, 5.0))
+    )).flatten
+    val rdd = sc.parallelize(data, 2)
+    val (meanLabel, mean, std) = MLUtils.computeStats(rdd, 3, 6)
+    assert(meanLabel === 0.5)
+    assert(mean === Vectors.dense(2.0, 3.0, 4.0))
+    assert(std === Vectors.dense(1.0, 1.0, 1.0))
+  }
+
+  test("loadLibSVMData") {
+    val lines =
+      """
+        |+1 1:1.0 3:2.0 5:3.0
+        |-1
+        |-1 2:4.0 4:5.0 6:6.0
+      """.stripMargin
+    val tempDir = Files.createTempDir()
+    val file = new File(tempDir.getPath, "part-00000")
+    Files.write(lines, file, Charsets.US_ASCII)
+    val path = tempDir.toURI.toString
+
+    val pointsWithNumFeatures = MLUtils.loadLibSVMData(sc, path, 6).collect()
+    val pointsWithoutNumFeatures = MLUtils.loadLibSVMData(sc, path).collect()
+
+    for (points <- Seq(pointsWithNumFeatures, pointsWithoutNumFeatures)) {
+      assert(points.length === 3)
+      assert(points(0).label === 1.0)
+      assert(points(0).features === Vectors.sparse(6, Seq((0, 1.0), (2, 2.0), (4, 3.0))))
+      assert(points(1).label == 0.0)
+      assert(points(1).features == Vectors.sparse(6, Seq()))
+      assert(points(2).label === 0.0)
+      assert(points(2).features === Vectors.sparse(6, Seq((1, 4.0), (3, 5.0), (5, 6.0))))
+    }
+
+    val multiclassPoints = MLUtils.loadLibSVMData(sc, path, MLUtils.multiclassLabelParser).collect()
+    assert(multiclassPoints.length === 3)
+    assert(multiclassPoints(0).label === 1.0)
+    assert(multiclassPoints(1).label === -1.0)
+    assert(multiclassPoints(2).label === -1.0)
+
+    try {
+      file.delete()
+      tempDir.delete()
+    } catch {
+      case t: Throwable =>
+    }
+  }
 }
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 19b90dfd6e1672e554d44f3d7ca82738159214c9..d2f9cdb3f429803f4a4a5f0025f8896132fbbcf5 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -87,18 +87,19 @@ class NaiveBayesModel(object):
     >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3)
     >>> model = NaiveBayes.train(sc.parallelize(data))
     >>> model.predict(array([0.0, 1.0]))
-    0
+    0.0
     >>> model.predict(array([1.0, 0.0]))
-    1
+    1.0
     """
 
-    def __init__(self, pi, theta):
+    def __init__(self, labels, pi, theta):
+        self.labels = labels
         self.pi = pi
         self.theta = theta
 
     def predict(self, x):
         """Return the most likely class for a data vector x"""
-        return numpy.argmax(self.pi + dot(x, self.theta))
+        return self.labels[numpy.argmax(self.pi + dot(x, self.theta))]
 
 class NaiveBayes(object):
     @classmethod
@@ -122,7 +123,8 @@ class NaiveBayes(object):
         ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
         return NaiveBayesModel(
             _deserialize_double_vector(ans[0]),
-            _deserialize_double_matrix(ans[1]))
+            _deserialize_double_vector(ans[1]),
+            _deserialize_double_matrix(ans[2]))
 
 
 def _test():