From ee1fb97a97d5ac18cd2ad8028e84ecbd988fb811 Mon Sep 17 00:00:00 2001 From: RJ Nowling <rnowling@gmail.com> Date: Thu, 18 Dec 2014 21:00:49 -0800 Subject: [PATCH] [SPARK-4728][MLLib] Add exponential, gamma, and log normal sampling to MLlib da... ...ta generators This patch adds: * Exponential, gamma, and log normal generators that wrap Apache Commons math3 to the private API * Functions for generating exponential, gamma, and log normal RDDs and vector RDDs * Tests for the above Author: RJ Nowling <rnowling@gmail.com> Closes #3680 from rnowling/spark4728 and squashes the following commits: 455f50a [RJ Nowling] Add tests for exponential, gamma, and log normal samplers to JavaRandomRDDsSuite 3e1134a [RJ Nowling] Fix val/var, unncessary creation of Distribution objects when setting seeds, and import line longer than line wrap limits 58f5b97 [RJ Nowling] Fix bounds in tests so they scale with variance, not stdev 84fd98d [RJ Nowling] Add more values for testing distributions. 9f96232 [RJ Nowling] [SPARK-4728] Add exponential, gamma, and log normal sampling to MLlib data generators --- .../mllib/random/RandomDataGenerator.scala | 69 +++- .../spark/mllib/random/RandomRDDs.scala | 363 ++++++++++++++++++ .../mllib/random/JavaRandomRDDsSuite.java | 99 +++++ .../random/RandomDataGeneratorSuite.scala | 52 ++- .../spark/mllib/random/RandomRDDsSuite.scala | 43 +++ 5 files changed, 622 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala index 51f9b8657c..405bae62ee 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala @@ -17,7 +17,8 @@ package org.apache.spark.mllib.random -import org.apache.commons.math3.distribution.PoissonDistribution +import org.apache.commons.math3.distribution.{ExponentialDistribution, + GammaDistribution, LogNormalDistribution, PoissonDistribution} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} @@ -88,14 +89,76 @@ class StandardNormalGenerator extends RandomDataGenerator[Double] { @DeveloperApi class PoissonGenerator(val mean: Double) extends RandomDataGenerator[Double] { - private var rng = new PoissonDistribution(mean) + private val rng = new PoissonDistribution(mean) override def nextValue(): Double = rng.sample() override def setSeed(seed: Long) { - rng = new PoissonDistribution(mean) rng.reseedRandomGenerator(seed) } override def copy(): PoissonGenerator = new PoissonGenerator(mean) } + +/** + * :: DeveloperApi :: + * Generates i.i.d. samples from the exponential distribution with the given mean. + * + * @param mean mean for the exponential distribution. + */ +@DeveloperApi +class ExponentialGenerator(val mean: Double) extends RandomDataGenerator[Double] { + + private val rng = new ExponentialDistribution(mean) + + override def nextValue(): Double = rng.sample() + + override def setSeed(seed: Long) { + rng.reseedRandomGenerator(seed) + } + + override def copy(): ExponentialGenerator = new ExponentialGenerator(mean) +} + +/** + * :: DeveloperApi :: + * Generates i.i.d. samples from the gamma distribution with the given shape and scale. + * + * @param shape shape for the gamma distribution. + * @param scale scale for the gamma distribution + */ +@DeveloperApi +class GammaGenerator(val shape: Double, val scale: Double) extends RandomDataGenerator[Double] { + + private val rng = new GammaDistribution(shape, scale) + + override def nextValue(): Double = rng.sample() + + override def setSeed(seed: Long) { + rng.reseedRandomGenerator(seed) + } + + override def copy(): GammaGenerator = new GammaGenerator(shape, scale) +} + +/** + * :: DeveloperApi :: + * Generates i.i.d. samples from the log normal distribution with the + * given mean and standard deviation. + * + * @param mean mean for the log normal distribution. + * @param std standard deviation for the log normal distribution + */ +@DeveloperApi +class LogNormalGenerator(val mean: Double, val std: Double) extends RandomDataGenerator[Double] { + + private val rng = new LogNormalDistribution(mean, std) + + override def nextValue(): Double = rng.sample() + + override def setSeed(seed: Long) { + rng.reseedRandomGenerator(seed) + } + + override def copy(): LogNormalGenerator = new LogNormalGenerator(mean, std) +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala index c5f4b08432..955c593a08 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala @@ -176,6 +176,176 @@ object RandomRDDs { JavaDoubleRDD.fromRDD(poissonRDD(jsc.sc, mean, size)) } + /** + * Generates an RDD comprised of i.i.d. samples from the exponential distribution with + * the input mean. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or 1 / lambda, for the exponential distribution. + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). + * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + */ + def exponentialRDD( + sc: SparkContext, + mean: Double, + size: Long, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Double] = { + val exponential = new ExponentialGenerator(mean) + randomRDD(sc, exponential, size, numPartitionsOrDefault(sc, numPartitions), seed) + } + + /** + * Java-friendly version of [[RandomRDDs#exponentialRDD]]. + */ + def exponentialJavaRDD( + jsc: JavaSparkContext, + mean: Double, + size: Long, + numPartitions: Int, + seed: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(exponentialRDD(jsc.sc, mean, size, numPartitions, seed)) + } + + /** + * [[RandomRDDs#exponentialJavaRDD]] with the default seed. + */ + def exponentialJavaRDD( + jsc: JavaSparkContext, + mean: Double, + size: Long, + numPartitions: Int): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(exponentialRDD(jsc.sc, mean, size, numPartitions)) + } + + /** + * [[RandomRDDs#exponentialJavaRDD]] with the default number of partitions and the default seed. + */ + def exponentialJavaRDD(jsc: JavaSparkContext, mean: Double, size: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(exponentialRDD(jsc.sc, mean, size)) + } + + /** + * Generates an RDD comprised of i.i.d. samples from the gamma distribution with the input + * shape and scale. + * + * @param sc SparkContext used to create the RDD. + * @param shape shape parameter (> 0) for the gamma distribution + * @param scale scale parameter (> 0) for the gamma distribution + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). + * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + */ + def gammaRDD( + sc: SparkContext, + shape: Double, + scale: Double, + size: Long, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Double] = { + val gamma = new GammaGenerator(shape, scale) + randomRDD(sc, gamma, size, numPartitionsOrDefault(sc, numPartitions), seed) + } + + /** + * Java-friendly version of [[RandomRDDs#gammaRDD]]. + */ + def gammaJavaRDD( + jsc: JavaSparkContext, + shape: Double, + scale: Double, + size: Long, + numPartitions: Int, + seed: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(gammaRDD(jsc.sc, shape, scale, size, numPartitions, seed)) + } + + /** + * [[RandomRDDs#gammaJavaRDD]] with the default seed. + */ + def gammaJavaRDD( + jsc: JavaSparkContext, + shape: Double, + scale: Double, + size: Long, + numPartitions: Int): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(gammaRDD(jsc.sc, shape, scale, size, numPartitions)) + } + + /** + * [[RandomRDDs#gammaJavaRDD]] with the default number of partitions and the default seed. + */ + def gammaJavaRDD( + jsc: JavaSparkContext, + shape: Double, + scale: Double, + size: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(gammaRDD(jsc.sc, shape, scale, size)) + } + + /** + * Generates an RDD comprised of i.i.d. samples from the log normal distribution with the input + * mean and standard deviation + * + * @param sc SparkContext used to create the RDD. + * @param mean mean for the log normal distribution + * @param std standard deviation for the log normal distribution + * @param size Size of the RDD. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). + * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean). + */ + def logNormalRDD( + sc: SparkContext, + mean: Double, + std: Double, + size: Long, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Double] = { + val logNormal = new LogNormalGenerator(mean, std) + randomRDD(sc, logNormal, size, numPartitionsOrDefault(sc, numPartitions), seed) + } + + /** + * Java-friendly version of [[RandomRDDs#logNormalRDD]]. + */ + def logNormalJavaRDD( + jsc: JavaSparkContext, + mean: Double, + std: Double, + size: Long, + numPartitions: Int, + seed: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(logNormalRDD(jsc.sc, mean, std, size, numPartitions, seed)) + } + + /** + * [[RandomRDDs#logNormalJavaRDD]] with the default seed. + */ + def logNormalJavaRDD( + jsc: JavaSparkContext, + mean: Double, + std: Double, + size: Long, + numPartitions: Int): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(logNormalRDD(jsc.sc, mean, std, size, numPartitions)) + } + + /** + * [[RandomRDDs#logNormalJavaRDD]] with the default number of partitions and the default seed. + */ + def logNormalJavaRDD( + jsc: JavaSparkContext, + mean: Double, + std: Double, + size: Long): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(logNormalRDD(jsc.sc, mean, std, size)) + } + + /** * :: DeveloperApi :: * Generates an RDD comprised of i.i.d. samples produced by the input RandomDataGenerator. @@ -307,6 +477,72 @@ object RandomRDDs { normalVectorRDD(jsc.sc, numRows, numCols).toJavaRDD() } + /** + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from a + * log normal distribution. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean of the log normal distribution. + * @param std Standard deviation of the log normal distribution. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`). + * @param seed Random seed (default: a random long integer). + * @return RDD[Vector] with vectors containing i.i.d. samples. + */ + def logNormalVectorRDD( + sc: SparkContext, + mean: Double, + std: Double, + numRows: Long, + numCols: Int, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Vector] = { + val logNormal = new LogNormalGenerator(mean, std) + randomVectorRDD(sc, logNormal, numRows, numCols, + numPartitionsOrDefault(sc, numPartitions), seed) + } + + /** + * Java-friendly version of [[RandomRDDs#logNormalVectorRDD]]. + */ + def logNormalJavaVectorRDD( + jsc: JavaSparkContext, + mean: Double, + std: Double, + numRows: Long, + numCols: Int, + numPartitions: Int, + seed: Long): JavaRDD[Vector] = { + logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols, numPartitions, seed).toJavaRDD() + } + + /** + * [[RandomRDDs#logNormalJavaVectorRDD]] with the default seed. + */ + def logNormalJavaVectorRDD( + jsc: JavaSparkContext, + mean: Double, + std: Double, + numRows: Long, + numCols: Int, + numPartitions: Int): JavaRDD[Vector] = { + logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols, numPartitions).toJavaRDD() + } + + /** + * [[RandomRDDs#logNormalJavaVectorRDD]] with the default number of partitions and + * the default seed. + */ + def logNormalJavaVectorRDD( + jsc: JavaSparkContext, + mean: Double, + std: Double, + numRows: Long, + numCols: Int): JavaRDD[Vector] = { + logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols).toJavaRDD() + } + /** * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the * Poisson distribution with the input mean. @@ -366,6 +602,133 @@ object RandomRDDs { poissonVectorRDD(jsc.sc, mean, numRows, numCols).toJavaRDD() } + /** + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the + * exponential distribution with the input mean. + * + * @param sc SparkContext used to create the RDD. + * @param mean Mean, or 1 / lambda, for the Exponential distribution. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`) + * @param seed Random seed (default: a random long integer). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ Exp(mean). + */ + def exponentialVectorRDD( + sc: SparkContext, + mean: Double, + numRows: Long, + numCols: Int, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Vector] = { + val exponential = new ExponentialGenerator(mean) + randomVectorRDD(sc, exponential, numRows, numCols, + numPartitionsOrDefault(sc, numPartitions), seed) + } + + /** + * Java-friendly version of [[RandomRDDs#exponentialVectorRDD]]. + */ + def exponentialJavaVectorRDD( + jsc: JavaSparkContext, + mean: Double, + numRows: Long, + numCols: Int, + numPartitions: Int, + seed: Long): JavaRDD[Vector] = { + exponentialVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions, seed).toJavaRDD() + } + + /** + * [[RandomRDDs#exponentialJavaVectorRDD]] with the default seed. + */ + def exponentialJavaVectorRDD( + jsc: JavaSparkContext, + mean: Double, + numRows: Long, + numCols: Int, + numPartitions: Int): JavaRDD[Vector] = { + exponentialVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions).toJavaRDD() + } + + /** + * [[RandomRDDs#exponentialJavaVectorRDD]] with the default number of partitions + * and the default seed. + */ + def exponentialJavaVectorRDD( + jsc: JavaSparkContext, + mean: Double, + numRows: Long, + numCols: Int): JavaRDD[Vector] = { + exponentialVectorRDD(jsc.sc, mean, numRows, numCols).toJavaRDD() + } + + + /** + * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the + * gamma distribution with the input shape and scale. + * + * @param sc SparkContext used to create the RDD. + * @param shape shape parameter (> 0) for the gamma distribution. + * @param scale scale parameter (> 0) for the gamma distribution. + * @param numRows Number of Vectors in the RDD. + * @param numCols Number of elements in each Vector. + * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`) + * @param seed Random seed (default: a random long integer). + * @return RDD[Vector] with vectors containing i.i.d. samples ~ Exp(mean). + */ + def gammaVectorRDD( + sc: SparkContext, + shape: Double, + scale: Double, + numRows: Long, + numCols: Int, + numPartitions: Int = 0, + seed: Long = Utils.random.nextLong()): RDD[Vector] = { + val gamma = new GammaGenerator(shape, scale) + randomVectorRDD(sc, gamma, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed) + } + + /** + * Java-friendly version of [[RandomRDDs#gammaVectorRDD]]. + */ + def gammaJavaVectorRDD( + jsc: JavaSparkContext, + shape: Double, + scale: Double, + numRows: Long, + numCols: Int, + numPartitions: Int, + seed: Long): JavaRDD[Vector] = { + gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols, numPartitions, seed).toJavaRDD() + } + + /** + * [[RandomRDDs#gammaJavaVectorRDD]] with the default seed. + */ + def gammaJavaVectorRDD( + jsc: JavaSparkContext, + shape: Double, + scale: Double, + numRows: Long, + numCols: Int, + numPartitions: Int): JavaRDD[Vector] = { + gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols, numPartitions).toJavaRDD() + } + + /** + * [[RandomRDDs#gammaJavaVectorRDD]] with the default number of partitions and the default seed. + */ + def gammaJavaVectorRDD( + jsc: JavaSparkContext, + shape: Double, + scale: Double, + numRows: Long, + numCols: Int): JavaRDD[Vector] = { + gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols).toJavaRDD() + } + + /** * :: DeveloperApi :: * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the diff --git a/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java index a725736ca1..fcc13c00cb 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java @@ -69,6 +69,21 @@ public class JavaRandomRDDsSuite { } } + @Test + public void testLNormalRDD() { + double mean = 4.0; + double std = 2.0; + long m = 1000L; + int p = 2; + long seed = 1L; + JavaDoubleRDD rdd1 = logNormalJavaRDD(sc, mean, std, m); + JavaDoubleRDD rdd2 = logNormalJavaRDD(sc, mean, std, m, p); + JavaDoubleRDD rdd3 = logNormalJavaRDD(sc, mean, std, m, p, seed); + for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) { + Assert.assertEquals(m, rdd.count()); + } + } + @Test public void testPoissonRDD() { double mean = 2.0; @@ -83,6 +98,36 @@ public class JavaRandomRDDsSuite { } } + @Test + public void testExponentialRDD() { + double mean = 2.0; + long m = 1000L; + int p = 2; + long seed = 1L; + JavaDoubleRDD rdd1 = exponentialJavaRDD(sc, mean, m); + JavaDoubleRDD rdd2 = exponentialJavaRDD(sc, mean, m, p); + JavaDoubleRDD rdd3 = exponentialJavaRDD(sc, mean, m, p, seed); + for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) { + Assert.assertEquals(m, rdd.count()); + } + } + + @Test + public void testGammaRDD() { + double shape = 1.0; + double scale = 2.0; + long m = 1000L; + int p = 2; + long seed = 1L; + JavaDoubleRDD rdd1 = gammaJavaRDD(sc, shape, scale, m); + JavaDoubleRDD rdd2 = gammaJavaRDD(sc, shape, scale, m, p); + JavaDoubleRDD rdd3 = gammaJavaRDD(sc, shape, scale, m, p, seed); + for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) { + Assert.assertEquals(m, rdd.count()); + } + } + + @Test @SuppressWarnings("unchecked") public void testUniformVectorRDD() { @@ -115,6 +160,24 @@ public class JavaRandomRDDsSuite { } } + @Test + @SuppressWarnings("unchecked") + public void testLogNormalVectorRDD() { + double mean = 4.0; + double std = 2.0; + long m = 100L; + int n = 10; + int p = 2; + long seed = 1L; + JavaRDD<Vector> rdd1 = logNormalJavaVectorRDD(sc, mean, std, m, n); + JavaRDD<Vector> rdd2 = logNormalJavaVectorRDD(sc, mean, std, m, n, p); + JavaRDD<Vector> rdd3 = logNormalJavaVectorRDD(sc, mean, std, m, n, p, seed); + for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) { + Assert.assertEquals(m, rdd.count()); + Assert.assertEquals(n, rdd.first().size()); + } + } + @Test @SuppressWarnings("unchecked") public void testPoissonVectorRDD() { @@ -131,4 +194,40 @@ public class JavaRandomRDDsSuite { Assert.assertEquals(n, rdd.first().size()); } } + + @Test + @SuppressWarnings("unchecked") + public void testExponentialVectorRDD() { + double mean = 2.0; + long m = 100L; + int n = 10; + int p = 2; + long seed = 1L; + JavaRDD<Vector> rdd1 = exponentialJavaVectorRDD(sc, mean, m, n); + JavaRDD<Vector> rdd2 = exponentialJavaVectorRDD(sc, mean, m, n, p); + JavaRDD<Vector> rdd3 = exponentialJavaVectorRDD(sc, mean, m, n, p, seed); + for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) { + Assert.assertEquals(m, rdd.count()); + Assert.assertEquals(n, rdd.first().size()); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testGammaVectorRDD() { + double shape = 1.0; + double scale = 2.0; + long m = 100L; + int n = 10; + int p = 2; + long seed = 1L; + JavaRDD<Vector> rdd1 = gammaJavaVectorRDD(sc, shape, scale, m, n); + JavaRDD<Vector> rdd2 = gammaJavaVectorRDD(sc, shape, scale, m, n, p); + JavaRDD<Vector> rdd3 = gammaJavaVectorRDD(sc, shape, scale, m, n, p, seed); + for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) { + Assert.assertEquals(m, rdd.count()); + Assert.assertEquals(n, rdd.first().size()); + } + } + } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala index 3df7c128af..b792d819fd 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.random +import scala.math + import org.scalatest.FunSuite import org.apache.spark.util.StatCounter @@ -25,7 +27,6 @@ import org.apache.spark.util.StatCounter class RandomDataGeneratorSuite extends FunSuite { def apiChecks(gen: RandomDataGenerator[Double]) { - // resetting seed should generate the same sequence of random numbers gen.setSeed(42L) val array1 = (0 until 1000).map(_ => gen.nextValue()) @@ -79,6 +80,26 @@ class RandomDataGeneratorSuite extends FunSuite { distributionChecks(normal, 0.0, 1.0) } + test("LogNormalGenerator") { + List((0.0, 1.0), (0.0, 2.0), (2.0, 1.0), (2.0, 2.0)).map { + case (mean: Double, vari: Double) => + val normal = new LogNormalGenerator(mean, math.sqrt(vari)) + apiChecks(normal) + + // mean of log normal = e^(mean + var / 2) + val expectedMean = math.exp(mean + 0.5 * vari) + + // variance of log normal = (e^var - 1) * e^(2 * mean + var) + val expectedStd = math.sqrt((math.exp(vari) - 1.0) * math.exp(2.0 * mean + vari)) + + // since sampling error increases with variance, let's set + // the absolute tolerance as a percentage + val epsilon = 0.05 * expectedStd * expectedStd + + distributionChecks(normal, expectedMean, expectedStd, epsilon) + } + } + test("PoissonGenerator") { // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced. for (mean <- List(1.0, 5.0, 100.0)) { @@ -87,4 +108,33 @@ class RandomDataGeneratorSuite extends FunSuite { distributionChecks(poisson, mean, math.sqrt(mean), 0.1) } } + + test("ExponentialGenerator") { + // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced. + for (mean <- List(2.0, 5.0, 10.0, 50.0, 100.0)) { + val exponential = new ExponentialGenerator(mean) + apiChecks(exponential) + // var of exp = lambda^-2 = (1.0 / mean)^-2 = mean^2 + + // since sampling error increases with variance, let's set + // the absolute tolerance as a percentage + val epsilon = 0.05 * mean * mean + + distributionChecks(exponential, mean, mean, epsilon) + } + } + + test("GammaGenerator") { + // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced. + List((1.0, 2.0), (2.0, 2.0), (3.0, 2.0), (5.0, 1.0), (9.0, 0.5)).map { + case (shape: Double, scale: Double) => + val gamma = new GammaGenerator(shape, scale) + apiChecks(gamma) + // mean of gamma = shape * scale + val expectedMean = shape * scale + // var of gamma = shape * scale^2 + val expectedStd = math.sqrt(shape * scale * scale) + distributionChecks(gamma, expectedMean, expectedStd, 0.1) + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala index ea5889b3ec..6395188a08 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala @@ -110,7 +110,19 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa test("randomRDD for different distributions") { val size = 100000L val numPartitions = 10 + + // mean of log normal = e^(mean + var / 2) + val logNormalMean = math.exp(0.5) + // variance of log normal = (e^var - 1) * e^(2 * mean + var) + val logNormalStd = math.sqrt((math.E - 1.0) * math.E) + val gammaScale = 1.0 + val gammaShape = 2.0 + // mean of gamma = shape * scale + val gammaMean = gammaShape * gammaScale + // var of gamma = shape * scale^2 + val gammaStd = math.sqrt(gammaShape * gammaScale * gammaScale) val poissonMean = 100.0 + val exponentialMean = 1.0 for (seed <- 0 until 5) { val uniform = RandomRDDs.uniformRDD(sc, size, numPartitions, seed) @@ -119,8 +131,18 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa val normal = RandomRDDs.normalRDD(sc, size, numPartitions, seed) testGeneratedRDD(normal, size, numPartitions, 0.0, 1.0) + val logNormal = RandomRDDs.logNormalRDD(sc, 0.0, 1.0, size, numPartitions, seed) + testGeneratedRDD(logNormal, size, numPartitions, logNormalMean, logNormalStd, 0.1) + val poisson = RandomRDDs.poissonRDD(sc, poissonMean, size, numPartitions, seed) testGeneratedRDD(poisson, size, numPartitions, poissonMean, math.sqrt(poissonMean), 0.1) + + val exponential = RandomRDDs.exponentialRDD(sc, exponentialMean, size, numPartitions, seed) + testGeneratedRDD(exponential, size, numPartitions, exponentialMean, exponentialMean, 0.1) + + val gamma = RandomRDDs.gammaRDD(sc, gammaShape, gammaScale, size, numPartitions, seed) + testGeneratedRDD(gamma, size, numPartitions, gammaMean, gammaStd, 0.1) + } // mock distribution to check that partitions have unique seeds @@ -132,7 +154,19 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa val rows = 1000L val cols = 100 val parts = 10 + + // mean of log normal = e^(mean + var / 2) + val logNormalMean = math.exp(0.5) + // variance of log normal = (e^var - 1) * e^(2 * mean + var) + val logNormalStd = math.sqrt((math.E - 1.0) * math.E) + val gammaScale = 1.0 + val gammaShape = 2.0 + // mean of gamma = shape * scale + val gammaMean = gammaShape * gammaScale + // var of gamma = shape * scale^2 + val gammaStd = math.sqrt(gammaShape * gammaScale * gammaScale) val poissonMean = 100.0 + val exponentialMean = 1.0 for (seed <- 0 until 5) { val uniform = RandomRDDs.uniformVectorRDD(sc, rows, cols, parts, seed) @@ -141,8 +175,17 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa val normal = RandomRDDs.normalVectorRDD(sc, rows, cols, parts, seed) testGeneratedVectorRDD(normal, rows, cols, parts, 0.0, 1.0) + val logNormal = RandomRDDs.logNormalVectorRDD(sc, 0.0, 1.0, rows, cols, parts, seed) + testGeneratedVectorRDD(logNormal, rows, cols, parts, logNormalMean, logNormalStd, 0.1) + val poisson = RandomRDDs.poissonVectorRDD(sc, poissonMean, rows, cols, parts, seed) testGeneratedVectorRDD(poisson, rows, cols, parts, poissonMean, math.sqrt(poissonMean), 0.1) + + val exponential = RandomRDDs.exponentialVectorRDD(sc, exponentialMean, rows, cols, parts, seed) + testGeneratedVectorRDD(exponential, rows, cols, parts, exponentialMean, exponentialMean, 0.1) + + val gamma = RandomRDDs.gammaVectorRDD(sc, gammaShape, gammaScale, rows, cols, parts, seed) + testGeneratedVectorRDD(gamma, rows, cols, parts, gammaMean, gammaStd, 0.1) } } } -- GitLab