diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md
index 3dc8cc902fa72c1f09f4dccc5e67b4778ee7573e..2a2a7c13186d825c5453210e70dbc01efd681c2f 100644
--- a/docs/mllib-linear-methods.md
+++ b/docs/mllib-linear-methods.md
@@ -768,6 +768,58 @@ will get better!
 
 </div>
 
+<div data-lang="python" markdown="1">
+
+First, we import the necessary classes for parsing our input data and creating the model.
+
+{% highlight python %}
+from pyspark.mllib.linalg import Vectors
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.regression import StreamingLinearRegressionWithSGD
+{% endhighlight %}
+
+Then we make input streams for training and testing data. We assume a StreamingContext `ssc`
+has already been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing)
+for more info. For this example, we use labeled points in training and testing streams,
+but in practice you will likely want to use unlabeled vectors for test data.
+
+{% highlight python %}
+def parse(lp):
+    label = float(lp[lp.find('(') + 1: lp.find(',')])
+    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
+    return LabeledPoint(label, vec)
+
+trainingData = ssc.textFileStream("/training/data/dir").map(parse).cache()
+testData = ssc.textFileStream("/testing/data/dir").map(parse)
+{% endhighlight %}
+
+We create our model by initializing the weights to 0
+
+{% highlight python %}
+numFeatures = 3
+model = StreamingLinearRegressionWithSGD()
+model.setInitialWeights([0.0, 0.0, 0.0])
+{% endhighlight %}
+
+Now we register the streams for training and testing and start the job.
+
+{% highlight python %}
+model.trainOn(trainingData)
+print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))
+
+ssc.start()
+ssc.awaitTermination()
+{% endhighlight %}
+
+We can now save text files with data to the training or testing folders.
+Each line should be a data point formatted as `(y,[x1,x2,x3])` where `y` is the label
+and `x1,x2,x3` are the features. Anytime a text file is placed in `/training/data/dir`
+the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions.
+As you feed more data to the training directory, the predictions
+will get better!
+
+</div>
+
 </div>
 
 
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 735d45ba03d27605d1984be66bdcaae26defa378..8f27c446a66e8d0ae4a94041df6762de46721480 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -24,7 +24,9 @@ from pyspark import RDD
 from pyspark.streaming import DStream
 from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
 from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
-from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
+from pyspark.mllib.regression import (
+    LabeledPoint, LinearModel, _regression_train_wrapper,
+    StreamingLinearAlgorithm)
 from pyspark.mllib.util import Saveable, Loader, inherit_doc
 
 
@@ -585,55 +587,13 @@ class NaiveBayes(object):
         return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
 
 
-class StreamingLinearAlgorithm(object):
-    """
-    Base class that has to be inherited by any StreamingLinearAlgorithm.
-
-    Prevents reimplementation of methods predictOn and predictOnValues.
-    """
-    def __init__(self, model):
-        self._model = model
-
-    def latestModel(self):
-        """
-        Returns the latest model.
-        """
-        return self._model
-
-    def _validate(self, dstream):
-        if not isinstance(dstream, DStream):
-            raise TypeError(
-                "dstream should be a DStream object, got %s" % type(dstream))
-        if not self._model:
-            raise ValueError(
-                "Model must be intialized using setInitialWeights")
-
-    def predictOn(self, dstream):
-        """
-        Make predictions on a dstream.
-
-        :return: Transformed dstream object.
-        """
-        self._validate(dstream)
-        return dstream.map(lambda x: self._model.predict(x))
-
-    def predictOnValues(self, dstream):
-        """
-        Make predictions on a keyed dstream.
-
-        :return: Transformed dstream object.
-        """
-        self._validate(dstream)
-        return dstream.mapValues(lambda x: self._model.predict(x))
-
-
 @inherit_doc
 class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
     """
-    Run LogisticRegression with SGD on a stream of data.
+    Run LogisticRegression with SGD on a batch of data.
 
     The weights obtained at the end of training a stream are used as initial
-    weights for the next stream.
+    weights for the next batch.
 
     :param stepSize: Step size for each iteration of gradient descent.
     :param numIterations: Number of iterations run for each batch of data.
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 5ddbbee4babdd4f51985b26663658998672d9e0f..8e90adee5f4c2a32e08ba32182b81d7c3ab2f7d7 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -19,6 +19,7 @@ import numpy as np
 from numpy import array
 
 from pyspark import RDD
+from pyspark.streaming.dstream import DStream
 from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
 from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
 from pyspark.mllib.util import Saveable, Loader
@@ -570,6 +571,95 @@ class IsotonicRegression(object):
         return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)
 
 
+class StreamingLinearAlgorithm(object):
+    """
+    Base class that has to be inherited by any StreamingLinearAlgorithm.
+
+    Prevents reimplementation of methods predictOn and predictOnValues.
+    """
+    def __init__(self, model):
+        self._model = model
+
+    def latestModel(self):
+        """
+        Returns the latest model.
+        """
+        return self._model
+
+    def _validate(self, dstream):
+        if not isinstance(dstream, DStream):
+            raise TypeError(
+                "dstream should be a DStream object, got %s" % type(dstream))
+        if not self._model:
+            raise ValueError(
+                "Model must be intialized using setInitialWeights")
+
+    def predictOn(self, dstream):
+        """
+        Make predictions on a dstream.
+
+        :return: Transformed dstream object.
+        """
+        self._validate(dstream)
+        return dstream.map(lambda x: self._model.predict(x))
+
+    def predictOnValues(self, dstream):
+        """
+        Make predictions on a keyed dstream.
+
+        :return: Transformed dstream object.
+        """
+        self._validate(dstream)
+        return dstream.mapValues(lambda x: self._model.predict(x))
+
+
+@inherit_doc
+class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
+    """
+    Run LinearRegression with SGD on a batch of data.
+
+    The problem minimized is (1 / n_samples) * (y - weights'X)**2.
+    After training on a batch of data, the weights obtained at the end of
+    training are used as initial weights for the next batch.
+
+    :param: stepSize Step size for each iteration of gradient descent.
+    :param: numIterations Total number of iterations run.
+    :param: miniBatchFraction Fraction of data on which SGD is run for each
+                              iteration.
+    """
+    def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0):
+        self.stepSize = stepSize
+        self.numIterations = numIterations
+        self.miniBatchFraction = miniBatchFraction
+        self._model = None
+        super(StreamingLinearRegressionWithSGD, self).__init__(
+            model=self._model)
+
+    def setInitialWeights(self, initialWeights):
+        """
+        Set the initial value of weights.
+
+        This must be set before running trainOn and predictOn
+        """
+        initialWeights = _convert_to_vector(initialWeights)
+        self._model = LinearRegressionModel(initialWeights, 0)
+        return self
+
+    def trainOn(self, dstream):
+        """Train the model on the incoming dstream."""
+        self._validate(dstream)
+
+        def update(rdd):
+            # LinearRegressionWithSGD.train raises an error for an empty RDD.
+            if not rdd.isEmpty():
+                self._model = LinearRegressionWithSGD.train(
+                    rdd, self.numIterations, self.stepSize,
+                    self.miniBatchFraction, self._model.weights,
+                    self._model.intercept)
+
+        dstream.foreachRDD(update)
+
+
 def _test():
     import doctest
     from pyspark import SparkContext
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index cd80c3e07a4f7c5f3b4684c9e37d2e96d1dbedad..f0091d6facccecbdc804b109c229d9f216fb03af 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -27,8 +27,9 @@ from time import time, sleep
 from shutil import rmtree
 
 from numpy import (
-    array, array_equal, zeros, inf, random, exp, dot, all, mean)
+    array, array_equal, zeros, inf, random, exp, dot, all, mean, abs)
 from numpy import sum as array_sum
+
 from py4j.protocol import Py4JJavaError
 
 if sys.version_info[:2] <= (2, 6):
@@ -45,8 +46,8 @@ from pyspark.mllib.common import _to_java_object_rdd
 from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
 from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
     DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
-from pyspark.mllib.regression import LabeledPoint
 from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
+from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD
 from pyspark.mllib.random import RandomRDDs
 from pyspark.mllib.stat import Statistics
 from pyspark.mllib.feature import Word2Vec
@@ -56,6 +57,7 @@ from pyspark.mllib.util import LinearDataGenerator
 from pyspark.serializers import PickleSerializer
 from pyspark.streaming import StreamingContext
 from pyspark.sql import SQLContext
+from pyspark.streaming import StreamingContext
 
 _have_scipy = False
 try:
@@ -1170,6 +1172,124 @@ class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
         self.assertTrue(errors[1] - errors[-1] > 0.3)
 
 
+class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
+
+    def assertArrayAlmostEqual(self, array1, array2, dec):
+        for i, j in array1, array2:
+            self.assertAlmostEqual(i, j, dec)
+
+    def test_parameter_accuracy(self):
+        """Test that coefs are predicted accurately by fitting on toy data."""
+
+        # Test that fitting (10*X1 + 10*X2), (X1, X2) gives coefficients
+        # (10, 10)
+        slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
+        slr.setInitialWeights([0.0, 0.0])
+        xMean = [0.0, 0.0]
+        xVariance = [1.0 / 3.0, 1.0 / 3.0]
+
+        # Create ten batches with 100 sample points in each.
+        batches = []
+        for i in range(10):
+            batch = LinearDataGenerator.generateLinearInput(
+                0.0, [10.0, 10.0], xMean, xVariance, 100, 42 + i, 0.1)
+            batches.append(sc.parallelize(batch))
+
+        input_stream = self.ssc.queueStream(batches)
+        t = time()
+        slr.trainOn(input_stream)
+        self.ssc.start()
+        self._ssc_wait(t, 10, 0.01)
+        self.assertArrayAlmostEqual(
+            slr.latestModel().weights.array, [10., 10.], 1)
+        self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)
+
+    def test_parameter_convergence(self):
+        """Test that the model parameters improve with streaming data."""
+        slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
+        slr.setInitialWeights([0.0])
+
+        # Create ten batches with 100 sample points in each.
+        batches = []
+        for i in range(10):
+            batch = LinearDataGenerator.generateLinearInput(
+                0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
+            batches.append(sc.parallelize(batch))
+
+        model_weights = []
+        input_stream = self.ssc.queueStream(batches)
+        input_stream.foreachRDD(
+            lambda x: model_weights.append(slr.latestModel().weights[0]))
+        t = time()
+        slr.trainOn(input_stream)
+        self.ssc.start()
+        self._ssc_wait(t, 10, 0.01)
+
+        model_weights = array(model_weights)
+        diff = model_weights[1:] - model_weights[:-1]
+        self.assertTrue(all(diff >= -0.1))
+
+    def test_prediction(self):
+        """Test prediction on a model with weights already set."""
+        # Create a model with initial Weights equal to coefs
+        slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
+        slr.setInitialWeights([10.0, 10.0])
+
+        # Create ten batches with 100 sample points in each.
+        batches = []
+        for i in range(10):
+            batch = LinearDataGenerator.generateLinearInput(
+                0.0, [10.0, 10.0], [0.0, 0.0], [1.0 / 3.0, 1.0 / 3.0],
+                100, 42 + i, 0.1)
+            batches.append(
+                sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))
+
+        input_stream = self.ssc.queueStream(batches)
+        t = time()
+        output_stream = slr.predictOnValues(input_stream)
+        samples = []
+        output_stream.foreachRDD(lambda x: samples.append(x.collect()))
+
+        self.ssc.start()
+        self._ssc_wait(t, 5, 0.01)
+
+        # Test that mean absolute error on each batch is less than 0.1
+        for batch in samples:
+            true, predicted = zip(*batch)
+            self.assertTrue(mean(abs(array(true) - array(predicted))) < 0.1)
+
+    def test_train_prediction(self):
+        """Test that error on test data improves as model is trained."""
+        slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
+        slr.setInitialWeights([0.0])
+
+        # Create ten batches with 100 sample points in each.
+        batches = []
+        for i in range(10):
+            batch = LinearDataGenerator.generateLinearInput(
+                0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
+            batches.append(sc.parallelize(batch))
+
+        predict_batches = [
+            b.map(lambda lp: (lp.label, lp.features)) for b in batches]
+        mean_absolute_errors = []
+
+        def func(rdd):
+            true, predicted = zip(*rdd.collect())
+            mean_absolute_errors.append(mean(abs(true) - abs(predicted)))
+
+        model_weights = []
+        input_stream = self.ssc.queueStream(batches)
+        output_stream = self.ssc.queueStream(predict_batches)
+        t = time()
+        slr.trainOn(input_stream)
+        output_stream = slr.predictOnValues(output_stream)
+        output_stream.foreachRDD(func)
+        self.ssc.start()
+        self._ssc_wait(t, 10, 0.01)
+        self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2)
+
+
 if __name__ == "__main__":
     if not _have_scipy:
         print("NOTE: Skipping SciPy tests as it does not seem to be installed")