Skip to content
Snippets Groups Projects
Commit c7252b00 authored by freeman's avatar freeman Committed by Xiangrui Meng
Browse files

[SPARK-3112][MLLIB] Add documentation and example for StreamingLR

Added a documentation section on StreamingLR to the ``MLlib - Linear Methods``, including a worked example.

mengxr tdas

Author: freeman <the.freeman.lab@gmail.com>

Closes #2047 from freeman-lab/streaming-lr-docs and squashes the following commits:

568d250 [freeman] Tweaks to wording / formatting
05a1139 [freeman] Added documentation and example for StreamingLR
parent 1870dbaa
No related branches found
No related tags found
No related merge requests found
......@@ -518,6 +518,81 @@ print("Mean Squared Error = " + str(MSE))
</div>
</div>
## Streaming linear regression
When data arrive in a streaming fashion, it is useful to fit regression models online,
updating the parameters of the model as new data arrives. MLlib currently supports
streaming linear regression using ordinary least squares. The fitting is similar
to that performed offline, except fitting occurs on each batch of data, so that
the model continually updates to reflect the data from the stream.
### Examples
The following example demonstrates how to load training and testing data from two different
input streams of text files, parse the streams as labeled points, fit a linear regression model
online to the first stream, and make predictions on the second stream.
<div class="codetabs">
<div data-lang="scala" markdown="1">
First, we import the necessary classes for parsing our input data and creating the model.
{% highlight scala %}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
{% endhighlight %}
Then we make input streams for training and testing data. We assume a StreamingContext `ssc`
has already been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing)
for more info. For this example, we use labeled points in training and testing streams,
but in practice you will likely want to use unlabeled vectors for test data.
{% highlight scala %}
val trainingData = ssc.textFileStream('/training/data/dir').map(LabeledPoint.parse)
val testData = ssc.textFileStream('/testing/data/dir').map(LabeledPoint.parse)
{% endhighlight %}
We create our model by initializing the weights to 0
{% highlight scala %}
val numFeatures = 3
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.zeros(numFeatures))
{% endhighlight %}
Now we register the streams for training and testing and start the job.
Printing predictions alongside true labels lets us easily see the result.
{% highlight scala %}
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
{% endhighlight %}
We can now save text files with data to the training or testing folders.
Each line should be a data point formatted as `(y,[x1,x2,x3])` where `y` is the label
and `x1,x2,x3` are the features. Anytime a text file is placed in `/training/data/dir`
the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions.
As you feed more data to the training directory, the predictions
will get better!
</div>
</div>
## Implementation (developer)
Behind the scene, MLlib implements a simple distributed version of stochastic gradient descent
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment