From c8b16ca0d86cc60fb960eebf0cb383f159a88b03 Mon Sep 17 00:00:00 2001
From: "Joseph K. Bradley" <joseph.kurata.bradley@gmail.com>
Date: Mon, 18 Aug 2014 18:01:39 -0700
Subject: [PATCH] [SPARK-2850] [SPARK-2626] [mllib] MLlib stats examples +
 small fixes

Added examples for statistical summarization:
* Scala: StatisticalSummary.scala
** Tests: correlation, MultivariateOnlineSummarizer
* python: statistical_summary.py
** Tests: correlation (since MultivariateOnlineSummarizer has no Python API)

Added examples for random and sampled RDDs:
* Scala: RandomAndSampledRDDs.scala
* python: random_and_sampled_rdds.py
* Both test:
** RandomRDDGenerators.normalRDD, normalVectorRDD
** RDD.sample, takeSample, sampleByKey

Added sc.stop() to all examples.

CorrelationSuite.scala
* Added 1 test for RDDs with only 1 value

RowMatrix.scala
* numCols(): Added check for numRows = 0, with error message.
* computeCovariance(): Added check for numRows <= 1, with error message.

Python SparseVector (pyspark/mllib/linalg.py)
* Added toDense() function

python/run-tests script
* Added stat.py (doc test)

CC: mengxr dorx  Main changes were examples to show usage across APIs.

Author: Joseph K. Bradley <joseph.kurata.bradley@gmail.com>

Closes #1878 from jkbradley/mllib-stats-api-check and squashes the following commits:

ea5c047 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
dafebe2 [Joseph K. Bradley] Bug fixes for examples SampledRDDs.scala and sampled_rdds.py: Check for division by 0 and for missing key in maps.
8d1e555 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
60c72d9 [Joseph K. Bradley] Fixed stat.py doc test to work for Python versions printing nan or NaN.
b20d90a [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
4e5d15e [Joseph K. Bradley] Changed pyspark/mllib/stat.py doc tests to use NaN instead of nan.
32173b7 [Joseph K. Bradley] Stats examples update.
c8c20dc [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
cf70b07 [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
0b7cec3 [Joseph K. Bradley] Small updates based on code review.  Renamed statistical_summary.py to correlations.py
ab48f6e [Joseph K. Bradley] RowMatrix.scala * numCols(): Added check for numRows = 0, with error message. * computeCovariance(): Added check for numRows <= 1, with error message.
65e4ebc [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
8195c78 [Joseph K. Bradley] Added examples for random and sampled RDDs: * Scala: RandomAndSampledRDDs.scala * python: random_and_sampled_rdds.py * Both test: ** RandomRDDGenerators.normalRDD, normalVectorRDD ** RDD.sample, takeSample, sampleByKey
064985b [Joseph K. Bradley] Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
ee918e9 [Joseph K. Bradley] Added examples for statistical summarization: * Scala: StatisticalSummary.scala ** Tests: correlation, MultivariateOnlineSummarizer * python: statistical_summary.py ** Tests: correlation (since MultivariateOnlineSummarizer has no Python API)
---
 examples/src/main/python/als.py               |   2 +
 .../src/main/python/cassandra_inputformat.py  |   2 +
 .../src/main/python/cassandra_outputformat.py |   2 +
 examples/src/main/python/hbase_inputformat.py |   2 +
 .../src/main/python/hbase_outputformat.py     |   2 +
 examples/src/main/python/kmeans.py            |   2 +
 .../src/main/python/logistic_regression.py    |   2 +
 .../src/main/python/mllib/correlations.py     |  60 +++++++++
 .../main/python/mllib/decision_tree_runner.py |   5 +
 examples/src/main/python/mllib/kmeans.py      |   1 +
 .../main/python/mllib/logistic_regression.py  |   1 +
 .../python/mllib/random_rdd_generation.py     |  55 ++++++++
 .../src/main/python/mllib/sampled_rdds.py     |  86 ++++++++++++
 examples/src/main/python/pagerank.py          |   2 +
 examples/src/main/python/pi.py                |   2 +
 examples/src/main/python/sort.py              |   2 +
 .../src/main/python/transitive_closure.py     |   2 +
 examples/src/main/python/wordcount.py         |   2 +
 .../spark/examples/mllib/Correlations.scala   |  92 +++++++++++++
 .../mllib/MultivariateSummarizer.scala        |  98 ++++++++++++++
 .../examples/mllib/RandomRDDGeneration.scala  |  60 +++++++++
 .../spark/examples/mllib/SampledRDDs.scala    | 126 ++++++++++++++++++
 .../mllib/linalg/distributed/RowMatrix.scala  |  14 +-
 .../stat/MultivariateOnlineSummarizer.scala   |   8 +-
 .../spark/mllib/stat/CorrelationSuite.scala   |  15 ++-
 .../MultivariateOnlineSummarizerSuite.scala   |   6 +-
 python/pyspark/mllib/linalg.py                |  10 ++
 python/pyspark/mllib/stat.py                  |  22 +--
 python/run-tests                              |   1 +
 29 files changed, 664 insertions(+), 20 deletions(-)
 create mode 100755 examples/src/main/python/mllib/correlations.py
 create mode 100755 examples/src/main/python/mllib/random_rdd_generation.py
 create mode 100755 examples/src/main/python/mllib/sampled_rdds.py
 create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala
 create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala
 create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala
 create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala

diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py
index c862650b0a..5b1fa4d997 100755
--- a/examples/src/main/python/als.py
+++ b/examples/src/main/python/als.py
@@ -97,3 +97,5 @@ if __name__ == "__main__":
         error = rmse(R, ms, us)
         print "Iteration %d:" % i
         print "\nRMSE: %5.4f\n" % error
+
+    sc.stop()
diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py
index 39fa6b0d22..e4a897f61e 100644
--- a/examples/src/main/python/cassandra_inputformat.py
+++ b/examples/src/main/python/cassandra_inputformat.py
@@ -77,3 +77,5 @@ if __name__ == "__main__":
     output = cass_rdd.collect()
     for (k, v) in output:
         print (k, v)
+
+    sc.stop()
diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py
index 1dfbf98604..836c35b5c6 100644
--- a/examples/src/main/python/cassandra_outputformat.py
+++ b/examples/src/main/python/cassandra_outputformat.py
@@ -81,3 +81,5 @@ if __name__ == "__main__":
         conf=conf,
         keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
         valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter")
+
+    sc.stop()
diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py
index c9fa8e171c..befacee0de 100644
--- a/examples/src/main/python/hbase_inputformat.py
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -71,3 +71,5 @@ if __name__ == "__main__":
     output = hbase_rdd.collect()
     for (k, v) in output:
         print (k, v)
+
+    sc.stop()
diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py
index 5e11548fd1..49bbc5aebd 100644
--- a/examples/src/main/python/hbase_outputformat.py
+++ b/examples/src/main/python/hbase_outputformat.py
@@ -63,3 +63,5 @@ if __name__ == "__main__":
         conf=conf,
         keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
         valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter")
+
+    sc.stop()
diff --git a/examples/src/main/python/kmeans.py b/examples/src/main/python/kmeans.py
index 036bdf4c4f..86ef6f32c8 100755
--- a/examples/src/main/python/kmeans.py
+++ b/examples/src/main/python/kmeans.py
@@ -77,3 +77,5 @@ if __name__ == "__main__":
             kPoints[x] = y
 
     print "Final centers: " + str(kPoints)
+
+    sc.stop()
diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py
index 8456b272f9..3aa56b0528 100755
--- a/examples/src/main/python/logistic_regression.py
+++ b/examples/src/main/python/logistic_regression.py
@@ -80,3 +80,5 @@ if __name__ == "__main__":
         w -= points.map(lambda m: gradient(m, w)).reduce(add)
 
     print "Final w: " + str(w)
+
+    sc.stop()
diff --git a/examples/src/main/python/mllib/correlations.py b/examples/src/main/python/mllib/correlations.py
new file mode 100755
index 0000000000..6b16a56e44
--- /dev/null
+++ b/examples/src/main/python/mllib/correlations.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Correlations using MLlib.
+"""
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.stat import Statistics
+from pyspark.mllib.util import MLUtils
+
+
+if __name__ == "__main__":
+    if len(sys.argv) not in [1,2]:
+        print >> sys.stderr, "Usage: correlations (<file>)"
+        exit(-1)
+    sc = SparkContext(appName="PythonCorrelations")
+    if len(sys.argv) == 2:
+        filepath = sys.argv[1]
+    else:
+        filepath = 'data/mllib/sample_linear_regression_data.txt'
+    corrType = 'pearson'
+
+    points = MLUtils.loadLibSVMFile(sc, filepath)\
+        .map(lambda lp: LabeledPoint(lp.label, lp.features.toArray()))
+
+    print
+    print 'Summary of data file: ' + filepath
+    print '%d data points' % points.count()
+
+    # Statistics (correlations)
+    print
+    print 'Correlation (%s) between label and each feature' % corrType
+    print 'Feature\tCorrelation'
+    numFeatures = points.take(1)[0].features.size
+    labelRDD = points.map(lambda lp: lp.label)
+    for i in range(numFeatures):
+        featureRDD = points.map(lambda lp: lp.features[i])
+        corr = Statistics.corr(labelRDD, featureRDD, corrType)
+        print '%d\t%g' % (i, corr)
+    print
+
+    sc.stop()
diff --git a/examples/src/main/python/mllib/decision_tree_runner.py b/examples/src/main/python/mllib/decision_tree_runner.py
index db96a7cb37..6e4a4a0cb6 100755
--- a/examples/src/main/python/mllib/decision_tree_runner.py
+++ b/examples/src/main/python/mllib/decision_tree_runner.py
@@ -17,6 +17,8 @@
 
 """
 Decision tree classification and regression using MLlib.
+
+This example requires NumPy (http://www.numpy.org/).
 """
 
 import numpy, os, sys
@@ -117,6 +119,7 @@ if __name__ == "__main__":
     if len(sys.argv) == 2:
         dataPath = sys.argv[1]
     if not os.path.isfile(dataPath):
+        sc.stop()
         usage()
     points = MLUtils.loadLibSVMFile(sc, dataPath)
 
@@ -133,3 +136,5 @@ if __name__ == "__main__":
     print "  Model depth: %d\n" % model.depth()
     print "  Training accuracy: %g\n" % getAccuracy(model, reindexedData)
     print model
+
+    sc.stop()
diff --git a/examples/src/main/python/mllib/kmeans.py b/examples/src/main/python/mllib/kmeans.py
index b308132c9a..2eeb1abeeb 100755
--- a/examples/src/main/python/mllib/kmeans.py
+++ b/examples/src/main/python/mllib/kmeans.py
@@ -42,3 +42,4 @@ if __name__ == "__main__":
     k = int(sys.argv[2])
     model = KMeans.train(data, k)
     print "Final centers: " + str(model.clusterCenters)
+    sc.stop()
diff --git a/examples/src/main/python/mllib/logistic_regression.py b/examples/src/main/python/mllib/logistic_regression.py
index 9d547ff77c..8cae27fc4a 100755
--- a/examples/src/main/python/mllib/logistic_regression.py
+++ b/examples/src/main/python/mllib/logistic_regression.py
@@ -50,3 +50,4 @@ if __name__ == "__main__":
     model = LogisticRegressionWithSGD.train(points, iterations)
     print "Final weights: " + str(model.weights)
     print "Final intercept: " + str(model.intercept)
+    sc.stop()
diff --git a/examples/src/main/python/mllib/random_rdd_generation.py b/examples/src/main/python/mllib/random_rdd_generation.py
new file mode 100755
index 0000000000..b388d8d83f
--- /dev/null
+++ b/examples/src/main/python/mllib/random_rdd_generation.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Randomly generated RDDs.
+"""
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.mllib.random import RandomRDDs
+
+
+if __name__ == "__main__":
+    if len(sys.argv) not in [1, 2]:
+        print >> sys.stderr, "Usage: random_rdd_generation"
+        exit(-1)
+
+    sc = SparkContext(appName="PythonRandomRDDGeneration")
+
+    numExamples = 10000 # number of examples to generate
+    fraction = 0.1 # fraction of data to sample
+
+    # Example: RandomRDDs.normalRDD
+    normalRDD = RandomRDDs.normalRDD(sc, numExamples)
+    print 'Generated RDD of %d examples sampled from the standard normal distribution'\
+        % normalRDD.count()
+    print '  First 5 samples:'
+    for sample in normalRDD.take(5):
+        print '    ' + str(sample)
+    print
+
+    # Example: RandomRDDs.normalVectorRDD
+    normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2)
+    print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count()
+    print '  First 5 samples:'
+    for sample in normalVectorRDD.take(5):
+        print '    ' + str(sample)
+    print
+
+    sc.stop()
diff --git a/examples/src/main/python/mllib/sampled_rdds.py b/examples/src/main/python/mllib/sampled_rdds.py
new file mode 100755
index 0000000000..ec64a5978c
--- /dev/null
+++ b/examples/src/main/python/mllib/sampled_rdds.py
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Randomly sampled RDDs.
+"""
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.mllib.util import MLUtils
+
+
+if __name__ == "__main__":
+    if len(sys.argv) not in [1, 2]:
+        print >> sys.stderr, "Usage: sampled_rdds <libsvm data file>"
+        exit(-1)
+    if len(sys.argv) == 2:
+        datapath = sys.argv[1]
+    else:
+        datapath = 'data/mllib/sample_binary_classification_data.txt'
+
+    sc = SparkContext(appName="PythonSampledRDDs")
+
+    fraction = 0.1 # fraction of data to sample
+
+    examples = MLUtils.loadLibSVMFile(sc, datapath)
+    numExamples = examples.count()
+    if numExamples == 0:
+        print >> sys.stderr, "Error: Data file had no samples to load."
+        exit(1)
+    print 'Loaded data with %d examples from file: %s' % (numExamples, datapath)
+
+    # Example: RDD.sample() and RDD.takeSample()
+    expectedSampleSize = int(numExamples * fraction)
+    print 'Sampling RDD using fraction %g.  Expected sample size = %d.' \
+        % (fraction, expectedSampleSize)
+    sampledRDD = examples.sample(withReplacement = True, fraction = fraction)
+    print '  RDD.sample(): sample has %d examples' % sampledRDD.count()
+    sampledArray = examples.takeSample(withReplacement = True, num = expectedSampleSize)
+    print '  RDD.takeSample(): sample has %d examples' % len(sampledArray)
+
+    print
+
+    # Example: RDD.sampleByKey()
+    keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features))
+    print '  Keyed data using label (Int) as key ==> Orig'
+    #  Count examples per label in original data.
+    keyCountsA = keyedRDD.countByKey()
+
+    #  Subsample, and count examples per label in sampled data.
+    fractions = {}
+    for k in keyCountsA.keys():
+        fractions[k] = fraction
+    sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = True, fractions = fractions)
+    keyCountsB = sampledByKeyRDD.countByKey()
+    sizeB = sum(keyCountsB.values())
+    print '  Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \
+        % sizeB
+
+    #  Compare samples
+    print '   \tFractions of examples with key'
+    print 'Key\tOrig\tSample'
+    for k in sorted(keyCountsA.keys()):
+        fracA = keyCountsA[k] / float(numExamples)
+        if sizeB != 0:
+            fracB = keyCountsB.get(k, 0) / float(sizeB)
+        else:
+            fracB = 0
+        print '%d\t%g\t%g' % (k, fracA, fracB)
+
+    sc.stop()
diff --git a/examples/src/main/python/pagerank.py b/examples/src/main/python/pagerank.py
index 0b96343158..b539c4128c 100755
--- a/examples/src/main/python/pagerank.py
+++ b/examples/src/main/python/pagerank.py
@@ -68,3 +68,5 @@ if __name__ == "__main__":
     # Collects all URL ranks and dump them to console.
     for (link, rank) in ranks.collect():
         print "%s has rank: %s." % (link, rank)
+
+    sc.stop()
diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py
index 21d94a2cd4..fc37459dc7 100755
--- a/examples/src/main/python/pi.py
+++ b/examples/src/main/python/pi.py
@@ -37,3 +37,5 @@ if __name__ == "__main__":
 
     count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)
     print "Pi is roughly %f" % (4.0 * count / n)
+
+    sc.stop()
diff --git a/examples/src/main/python/sort.py b/examples/src/main/python/sort.py
index 41d00c1b79..bb686f1751 100755
--- a/examples/src/main/python/sort.py
+++ b/examples/src/main/python/sort.py
@@ -34,3 +34,5 @@ if __name__ == "__main__":
     output = sortedCount.collect()
     for (num, unitcount) in output:
         print num
+
+    sc.stop()
diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py
index 8698369b13..bf331b542c 100755
--- a/examples/src/main/python/transitive_closure.py
+++ b/examples/src/main/python/transitive_closure.py
@@ -64,3 +64,5 @@ if __name__ == "__main__":
             break
 
     print "TC has %i edges" % tc.count()
+
+    sc.stop()
diff --git a/examples/src/main/python/wordcount.py b/examples/src/main/python/wordcount.py
index dcc095fdd0..ae6cd13b83 100755
--- a/examples/src/main/python/wordcount.py
+++ b/examples/src/main/python/wordcount.py
@@ -33,3 +33,5 @@ if __name__ == "__main__":
     output = counts.collect()
     for (word, count) in output:
         print "%s: %i" % (word, count)
+
+    sc.stop()
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala
new file mode 100644
index 0000000000..d6b2fe430e
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import scopt.OptionParser
+
+import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.{SparkConf, SparkContext}
+
+
+/**
+ * An example app for summarizing multivariate data from a file. Run with
+ * {{{
+ * bin/run-example org.apache.spark.examples.mllib.Correlations
+ * }}}
+ * By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`.
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object Correlations {
+
+  case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
+
+  def main(args: Array[String]) {
+
+    val defaultParams = Params()
+
+    val parser = new OptionParser[Params]("Correlations") {
+      head("Correlations: an example app for computing correlations")
+      opt[String]("input")
+        .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}")
+        .action((x, c) => c.copy(input = x))
+      note(
+        """
+        |For example, the following command runs this app on a synthetic dataset:
+        |
+        | bin/spark-submit --class org.apache.spark.examples.mllib.Correlations \
+        |  examples/target/scala-*/spark-examples-*.jar \
+        |  --input data/mllib/sample_linear_regression_data.txt
+        """.stripMargin)
+    }
+
+    parser.parse(args, defaultParams).map { params =>
+      run(params)
+    } getOrElse {
+        sys.exit(1)
+    }
+  }
+
+  def run(params: Params) {
+    val conf = new SparkConf().setAppName(s"Correlations with $params")
+    val sc = new SparkContext(conf)
+
+    val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()
+
+    println(s"Summary of data file: ${params.input}")
+    println(s"${examples.count()} data points")
+
+    // Calculate label -- feature correlations
+    val labelRDD = examples.map(_.label)
+    val numFeatures = examples.take(1)(0).features.size
+    val corrType = "pearson"
+    println()
+    println(s"Correlation ($corrType) between label and each feature")
+    println(s"Feature\tCorrelation")
+    var feature = 0
+    while (feature < numFeatures) {
+      val featureRDD = examples.map(_.features(feature))
+      val corr = Statistics.corr(labelRDD, featureRDD)
+      println(s"$feature\t$corr")
+      feature += 1
+    }
+    println()
+
+    sc.stop()
+  }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala
new file mode 100644
index 0000000000..4532512c01
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import scopt.OptionParser
+
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.{SparkConf, SparkContext}
+
+
+/**
+ * An example app for summarizing multivariate data from a file. Run with
+ * {{{
+ * bin/run-example org.apache.spark.examples.mllib.MultivariateSummarizer
+ * }}}
+ * By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`.
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object MultivariateSummarizer {
+
+  case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
+
+  def main(args: Array[String]) {
+
+    val defaultParams = Params()
+
+    val parser = new OptionParser[Params]("MultivariateSummarizer") {
+      head("MultivariateSummarizer: an example app for MultivariateOnlineSummarizer")
+      opt[String]("input")
+        .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}")
+        .action((x, c) => c.copy(input = x))
+      note(
+        """
+        |For example, the following command runs this app on a synthetic dataset:
+        |
+        | bin/spark-submit --class org.apache.spark.examples.mllib.MultivariateSummarizer \
+        |  examples/target/scala-*/spark-examples-*.jar \
+        |  --input data/mllib/sample_linear_regression_data.txt
+        """.stripMargin)
+    }
+
+    parser.parse(args, defaultParams).map { params =>
+      run(params)
+    } getOrElse {
+        sys.exit(1)
+    }
+  }
+
+  def run(params: Params) {
+    val conf = new SparkConf().setAppName(s"MultivariateSummarizer with $params")
+    val sc = new SparkContext(conf)
+
+    val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()
+
+    println(s"Summary of data file: ${params.input}")
+    println(s"${examples.count()} data points")
+
+    // Summarize labels
+    val labelSummary = examples.aggregate(new MultivariateOnlineSummarizer())(
+      (summary, lp) => summary.add(Vectors.dense(lp.label)),
+      (sum1, sum2) => sum1.merge(sum2))
+
+    // Summarize features
+    val featureSummary = examples.aggregate(new MultivariateOnlineSummarizer())(
+      (summary, lp) => summary.add(lp.features),
+      (sum1, sum2) => sum1.merge(sum2))
+
+    println()
+    println(s"Summary statistics")
+    println(s"\tLabel\tFeatures")
+    println(s"mean\t${labelSummary.mean(0)}\t${featureSummary.mean.toArray.mkString("\t")}")
+    println(s"var\t${labelSummary.variance(0)}\t${featureSummary.variance.toArray.mkString("\t")}")
+    println(
+      s"nnz\t${labelSummary.numNonzeros(0)}\t${featureSummary.numNonzeros.toArray.mkString("\t")}")
+    println(s"max\t${labelSummary.max(0)}\t${featureSummary.max.toArray.mkString("\t")}")
+    println(s"min\t${labelSummary.min(0)}\t${featureSummary.min.toArray.mkString("\t")}")
+    println()
+
+    sc.stop()
+  }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala
new file mode 100644
index 0000000000..924b586e3a
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.mllib.random.RandomRDDs
+import org.apache.spark.rdd.RDD
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+/**
+ * An example app for randomly generated RDDs. Run with
+ * {{{
+ * bin/run-example org.apache.spark.examples.mllib.RandomRDDGeneration
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object RandomRDDGeneration {
+
+  def main(args: Array[String]) {
+
+    val conf = new SparkConf().setAppName(s"RandomRDDGeneration")
+    val sc = new SparkContext(conf)
+
+    val numExamples = 10000 // number of examples to generate
+    val fraction = 0.1 // fraction of data to sample
+
+    // Example: RandomRDDs.normalRDD
+    val normalRDD: RDD[Double] = RandomRDDs.normalRDD(sc, numExamples)
+    println(s"Generated RDD of ${normalRDD.count()}" +
+      " examples sampled from the standard normal distribution")
+    println("  First 5 samples:")
+    normalRDD.take(5).foreach( x => println(s"    $x") )
+
+    // Example: RandomRDDs.normalVectorRDD
+    val normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2)
+    println(s"Generated RDD of ${normalVectorRDD.count()} examples of length-2 vectors.")
+    println("  First 5 samples:")
+    normalVectorRDD.take(5).foreach( x => println(s"    $x") )
+
+    println()
+
+    sc.stop()
+  }
+
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala
new file mode 100644
index 0000000000..f01b8266e3
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.mllib.util.MLUtils
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkContext._
+
+/**
+ * An example app for randomly generated and sampled RDDs. Run with
+ * {{{
+ * bin/run-example org.apache.spark.examples.mllib.SampledRDDs
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
+object SampledRDDs {
+
+  case class Params(input: String = "data/mllib/sample_binary_classification_data.txt")
+
+  def main(args: Array[String]) {
+    val defaultParams = Params()
+
+    val parser = new OptionParser[Params]("SampledRDDs") {
+      head("SampledRDDs: an example app for randomly generated and sampled RDDs.")
+      opt[String]("input")
+        .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}")
+        .action((x, c) => c.copy(input = x))
+      note(
+        """
+        |For example, the following command runs this app:
+        |
+        | bin/spark-submit --class org.apache.spark.examples.mllib.SampledRDDs \
+        |  examples/target/scala-*/spark-examples-*.jar
+        """.stripMargin)
+    }
+
+    parser.parse(args, defaultParams).map { params =>
+      run(params)
+    } getOrElse {
+      sys.exit(1)
+    }
+  }
+
+  def run(params: Params) {
+    val conf = new SparkConf().setAppName(s"SampledRDDs with $params")
+    val sc = new SparkContext(conf)
+
+    val fraction = 0.1 // fraction of data to sample
+
+    val examples = MLUtils.loadLibSVMFile(sc, params.input)
+    val numExamples = examples.count()
+    if (numExamples == 0) {
+      throw new RuntimeException("Error: Data file had no samples to load.")
+    }
+    println(s"Loaded data with $numExamples examples from file: ${params.input}")
+
+    // Example: RDD.sample() and RDD.takeSample()
+    val expectedSampleSize = (numExamples * fraction).toInt
+    println(s"Sampling RDD using fraction $fraction.  Expected sample size = $expectedSampleSize.")
+    val sampledRDD = examples.sample(withReplacement = true, fraction = fraction)
+    println(s"  RDD.sample(): sample has ${sampledRDD.count()} examples")
+    val sampledArray = examples.takeSample(withReplacement = true, num = expectedSampleSize)
+    println(s"  RDD.takeSample(): sample has ${sampledArray.size} examples")
+
+    println()
+
+    // Example: RDD.sampleByKey() and RDD.sampleByKeyExact()
+    val keyedRDD = examples.map { lp => (lp.label.toInt, lp.features) }
+    println(s"  Keyed data using label (Int) as key ==> Orig")
+    //  Count examples per label in original data.
+    val keyCounts = keyedRDD.countByKey()
+
+    //  Subsample, and count examples per label in sampled data. (approximate)
+    val fractions = keyCounts.keys.map((_, fraction)).toMap
+    val sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = true, fractions = fractions)
+    val keyCountsB = sampledByKeyRDD.countByKey()
+    val sizeB = keyCountsB.values.sum
+    println(s"  Sampled $sizeB examples using approximate stratified sampling (by label)." +
+      " ==> Approx Sample")
+
+    //  Subsample, and count examples per label in sampled data. (approximate)
+    val sampledByKeyRDDExact =
+      keyedRDD.sampleByKeyExact(withReplacement = true, fractions = fractions)
+    val keyCountsBExact = sampledByKeyRDDExact.countByKey()
+    val sizeBExact = keyCountsBExact.values.sum
+    println(s"  Sampled $sizeBExact examples using exact stratified sampling (by label)." +
+      " ==> Exact Sample")
+
+    //  Compare samples
+    println(s"   \tFractions of examples with key")
+    println(s"Key\tOrig\tApprox Sample\tExact Sample")
+    keyCounts.keys.toSeq.sorted.foreach { key =>
+      val origFrac = keyCounts(key) / numExamples.toDouble
+      val approxFrac = if (sizeB != 0) {
+        keyCountsB.getOrElse(key, 0L) / sizeB.toDouble
+      } else {
+        0
+      }
+      val exactFrac = if (sizeBExact != 0) {
+        keyCountsBExact.getOrElse(key, 0L) / sizeBExact.toDouble
+      } else {
+        0
+      }
+      println(s"$key\t$origFrac\t$approxFrac\t$exactFrac")
+    }
+
+    sc.stop()
+  }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index e76bc9feff..2e414a73be 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -53,8 +53,14 @@ class RowMatrix(
   /** Gets or computes the number of columns. */
   override def numCols(): Long = {
     if (nCols <= 0) {
-      // Calling `first` will throw an exception if `rows` is empty.
-      nCols = rows.first().size
+      try {
+        // Calling `first` will throw an exception if `rows` is empty.
+        nCols = rows.first().size
+      } catch {
+        case err: UnsupportedOperationException =>
+          sys.error("Cannot determine the number of cols because it is not specified in the " +
+            "constructor and the rows RDD is empty.")
+      }
     }
     nCols
   }
@@ -293,6 +299,10 @@ class RowMatrix(
         (s1._1 + s2._1, s1._2 += s2._2)
     )
 
+    if (m <= 1) {
+      sys.error(s"RowMatrix.computeCovariance called on matrix with only $m rows." +
+        "  Cannot compute the covariance of a RowMatrix with <= 1 row.")
+    }
     updateNumRows(m)
 
     mean :/= m.toDouble
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
index 5105b5c37a..7d845c4436 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
@@ -55,8 +55,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
    */
   def add(sample: Vector): this.type = {
     if (n == 0) {
-      require(sample.toBreeze.length > 0, s"Vector should have dimension larger than zero.")
-      n = sample.toBreeze.length
+      require(sample.size > 0, s"Vector should have dimension larger than zero.")
+      n = sample.size
 
       currMean = BDV.zeros[Double](n)
       currM2n = BDV.zeros[Double](n)
@@ -65,8 +65,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
       currMin = BDV.fill(n)(Double.MaxValue)
     }
 
-    require(n == sample.toBreeze.length, s"Dimensions mismatch when adding new sample." +
-      s" Expecting $n but got ${sample.toBreeze.length}.")
+    require(n == sample.size, s"Dimensions mismatch when adding new sample." +
+      s" Expecting $n but got ${sample.size}.")
 
     sample.toBreeze.activeIterator.foreach {
       case (_, 0.0) => // Skip explicit zero elements.
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala
index a3f76f77a5..34548c86eb 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala
@@ -39,6 +39,17 @@ class CorrelationSuite extends FunSuite with LocalSparkContext {
     Vectors.dense(9.0, 0.0, 0.0, 1.0)
   )
 
+  test("corr(x, y) pearson, 1 value in data") {
+    val x = sc.parallelize(Array(1.0))
+    val y = sc.parallelize(Array(4.0))
+    intercept[RuntimeException] {
+      Statistics.corr(x, y, "pearson")
+    }
+    intercept[RuntimeException] {
+      Statistics.corr(x, y, "spearman")
+    }
+  }
+
   test("corr(x, y) default, pearson") {
     val x = sc.parallelize(xData)
     val y = sc.parallelize(yData)
@@ -58,7 +69,7 @@ class CorrelationSuite extends FunSuite with LocalSparkContext {
 
     // RDD of zero variance
     val z = sc.parallelize(zeros)
-    assert(Statistics.corr(x, z).isNaN())
+    assert(Statistics.corr(x, z).isNaN)
   }
 
   test("corr(x, y) spearman") {
@@ -78,7 +89,7 @@ class CorrelationSuite extends FunSuite with LocalSparkContext {
 
     // RDD of zero variance => zero variance in ranks
     val z = sc.parallelize(zeros)
-    assert(Statistics.corr(x, z, "spearman").isNaN())
+    assert(Statistics.corr(x, z, "spearman").isNaN)
   }
 
   test("corr(X) default, pearson") {
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala
index db13f142df..1e94152491 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala
@@ -139,7 +139,8 @@ class MultivariateOnlineSummarizerSuite extends FunSuite {
     assert(summarizer.numNonzeros ~== Vectors.dense(3, 5, 2) absTol 1E-5, "numNonzeros mismatch")
 
     assert(summarizer.variance ~==
-      Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5, "variance mismatch")
+      Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5,
+      "variance mismatch")
 
     assert(summarizer.count === 6)
   }
@@ -167,7 +168,8 @@ class MultivariateOnlineSummarizerSuite extends FunSuite {
     assert(summarizer.numNonzeros ~== Vectors.dense(3, 5, 2) absTol 1E-5, "numNonzeros mismatch")
 
     assert(summarizer.variance ~==
-      Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5, "variance mismatch")
+      Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5,
+      "variance mismatch")
 
     assert(summarizer.count === 6)
   }
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 9a239abfbb..f485a69db1 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -23,6 +23,7 @@ object from MLlib or pass SciPy C{scipy.sparse} column vectors if
 SciPy is available in their environment.
 """
 
+import numpy
 from numpy import array, array_equal, ndarray, float64, int32
 
 
@@ -160,6 +161,15 @@ class SparseVector(object):
                 j += 1
             return result
 
+    def toArray(self):
+        """
+        Returns a copy of this SparseVector as a 1-dimensional NumPy array.
+        """
+        arr = numpy.zeros(self.size)
+        for i in xrange(self.indices.size):
+            arr[self.indices[i]] = self.values[i]
+        return arr
+
     def __str__(self):
         inds = "[" + ",".join([str(i) for i in self.indices]) + "]"
         vals = "[" + ",".join([str(v) for v in self.values]) + "]"
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index a73abc5ff9..feef0d16cd 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -118,16 +118,18 @@ class Statistics(object):
         >>> from linalg import Vectors
         >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
         ...                       Vectors.dense([6, 7, 0,  8]), Vectors.dense([9, 0, 0, 1])])
-        >>> Statistics.corr(rdd)
-        array([[ 1.        ,  0.05564149,         nan,  0.40047142],
-               [ 0.05564149,  1.        ,         nan,  0.91359586],
-               [        nan,         nan,  1.        ,         nan],
-               [ 0.40047142,  0.91359586,         nan,  1.        ]])
-        >>> Statistics.corr(rdd, method="spearman")
-        array([[ 1.        ,  0.10540926,         nan,  0.4       ],
-               [ 0.10540926,  1.        ,         nan,  0.9486833 ],
-               [        nan,         nan,  1.        ,         nan],
-               [ 0.4       ,  0.9486833 ,         nan,  1.        ]])
+        >>> pearsonCorr = Statistics.corr(rdd)
+        >>> print str(pearsonCorr).replace('nan', 'NaN')
+        [[ 1.          0.05564149         NaN  0.40047142]
+         [ 0.05564149  1.                 NaN  0.91359586]
+         [        NaN         NaN  1.                 NaN]
+         [ 0.40047142  0.91359586         NaN  1.        ]]
+        >>> spearmanCorr = Statistics.corr(rdd, method="spearman")
+        >>> print str(spearmanCorr).replace('nan', 'NaN')
+        [[ 1.          0.10540926         NaN  0.4       ]
+         [ 0.10540926  1.                 NaN  0.9486833 ]
+         [        NaN         NaN  1.                 NaN]
+         [ 0.4         0.9486833          NaN  1.        ]]
         >>> try:
         ...     Statistics.corr(rdd, "spearman")
         ...     print "Method name as second argument without 'method=' shouldn't be allowed."
diff --git a/python/run-tests b/python/run-tests
index a6271e0cf5..b506559a5e 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -78,6 +78,7 @@ run_test "pyspark/mllib/linalg.py"
 run_test "pyspark/mllib/random.py"
 run_test "pyspark/mllib/recommendation.py"
 run_test "pyspark/mllib/regression.py"
+run_test "pyspark/mllib/stat.py"
 run_test "pyspark/mllib/tests.py"
 run_test "pyspark/mllib/tree.py"
 run_test "pyspark/mllib/util.py"
-- 
GitLab