Skip to content
Snippets Groups Projects
Commit c9c8b219 authored by RJ Nowling's avatar RJ Nowling Committed by Xiangrui Meng
Browse files

[SPARK-4891][PySpark][MLlib] Add gamma/log normal/exp dist sampling to P...

...ySpark MLlib

This is a follow up to PR3680 https://github.com/apache/spark/pull/3680 .

Author: RJ Nowling <rnowling@gmail.com>

Closes #3955 from rnowling/spark4891 and squashes the following commits:

1236a01 [RJ Nowling] Fix Python style issues
7a01a78 [RJ Nowling] Fix Python style issues
174beab [RJ Nowling] [SPARK-4891][PySpark][MLlib] Add gamma/log normal/exp dist sampling to PySpark MLlib
parent a00af6be
No related branches found
No related tags found
No related merge requests found
......@@ -624,6 +624,21 @@ class PythonMLLibAPI extends Serializable {
RG.normalRDD(jsc.sc, size, parts, s)
}
/**
* Java stub for Python mllib RandomRDDGenerators.logNormalRDD()
*/
def logNormalRDD(jsc: JavaSparkContext,
mean: Double,
std: Double,
size: Long,
numPartitions: java.lang.Integer,
seed: java.lang.Long): JavaRDD[Double] = {
val parts = getNumPartitionsOrDefault(numPartitions, jsc)
val s = getSeedOrDefault(seed)
RG.logNormalRDD(jsc.sc, mean, std, size, parts, s)
}
/**
* Java stub for Python mllib RandomRDDGenerators.poissonRDD()
*/
......@@ -637,6 +652,33 @@ class PythonMLLibAPI extends Serializable {
RG.poissonRDD(jsc.sc, mean, size, parts, s)
}
/**
* Java stub for Python mllib RandomRDDGenerators.exponentialRDD()
*/
def exponentialRDD(jsc: JavaSparkContext,
mean: Double,
size: Long,
numPartitions: java.lang.Integer,
seed: java.lang.Long): JavaRDD[Double] = {
val parts = getNumPartitionsOrDefault(numPartitions, jsc)
val s = getSeedOrDefault(seed)
RG.exponentialRDD(jsc.sc, mean, size, parts, s)
}
/**
* Java stub for Python mllib RandomRDDGenerators.gammaRDD()
*/
def gammaRDD(jsc: JavaSparkContext,
shape: Double,
scale: Double,
size: Long,
numPartitions: java.lang.Integer,
seed: java.lang.Long): JavaRDD[Double] = {
val parts = getNumPartitionsOrDefault(numPartitions, jsc)
val s = getSeedOrDefault(seed)
RG.gammaRDD(jsc.sc, shape, scale, size, parts, s)
}
/**
* Java stub for Python mllib RandomRDDGenerators.uniformVectorRDD()
*/
......@@ -663,6 +705,22 @@ class PythonMLLibAPI extends Serializable {
RG.normalVectorRDD(jsc.sc, numRows, numCols, parts, s)
}
/**
* Java stub for Python mllib RandomRDDGenerators.logNormalVectorRDD()
*/
def logNormalVectorRDD(jsc: JavaSparkContext,
mean: Double,
std: Double,
numRows: Long,
numCols: Int,
numPartitions: java.lang.Integer,
seed: java.lang.Long): JavaRDD[Vector] = {
val parts = getNumPartitionsOrDefault(numPartitions, jsc)
val s = getSeedOrDefault(seed)
RG.logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols, parts, s)
}
/**
* Java stub for Python mllib RandomRDDGenerators.poissonVectorRDD()
*/
......@@ -677,6 +735,36 @@ class PythonMLLibAPI extends Serializable {
RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
}
/**
* Java stub for Python mllib RandomRDDGenerators.exponentialVectorRDD()
*/
def exponentialVectorRDD(jsc: JavaSparkContext,
mean: Double,
numRows: Long,
numCols: Int,
numPartitions: java.lang.Integer,
seed: java.lang.Long): JavaRDD[Vector] = {
val parts = getNumPartitionsOrDefault(numPartitions, jsc)
val s = getSeedOrDefault(seed)
RG.exponentialVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
}
/**
* Java stub for Python mllib RandomRDDGenerators.gammaVectorRDD()
*/
def gammaVectorRDD(jsc: JavaSparkContext,
shape: Double,
scale: Double,
numRows: Long,
numCols: Int,
numPartitions: java.lang.Integer,
seed: java.lang.Long): JavaRDD[Vector] = {
val parts = getNumPartitionsOrDefault(numPartitions, jsc)
val s = getSeedOrDefault(seed)
RG.gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols, parts, s)
}
}
/**
......
......@@ -99,6 +99,38 @@ class RandomRDDs(object):
"""
return callMLlibFunc("normalRDD", sc._jsc, size, numPartitions, seed)
@staticmethod
def logNormalRDD(sc, mean, std, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the log normal
distribution with the input mean and standard distribution.
:param sc: SparkContext used to create the RDD.
:param mean: mean for the log Normal distribution
:param std: std for the log Normal distribution
:param size: Size of the RDD.
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
:param seed: Random seed (default: a random long integer).
:return: RDD of float comprised of i.i.d. samples ~ log N(mean, std).
>>> from math import sqrt, exp
>>> mean = 0.0
>>> std = 1.0
>>> expMean = exp(mean + 0.5 * std * std)
>>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
>>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2L)
>>> stats = x.stats()
>>> stats.count()
1000L
>>> abs(stats.mean() - expMean) < 0.5
True
>>> from math import sqrt
>>> abs(stats.stdev() - expStd) < 0.5
True
"""
return callMLlibFunc("logNormalRDD", sc._jsc, float(mean), float(std),
size, numPartitions, seed)
@staticmethod
def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
"""
......@@ -125,6 +157,63 @@ class RandomRDDs(object):
"""
return callMLlibFunc("poissonRDD", sc._jsc, float(mean), size, numPartitions, seed)
@staticmethod
def exponentialRDD(sc, mean, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the Exponential
distribution with the input mean.
:param sc: SparkContext used to create the RDD.
:param mean: Mean, or 1 / lambda, for the Exponential distribution.
:param size: Size of the RDD.
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
:param seed: Random seed (default: a random long integer).
:return: RDD of float comprised of i.i.d. samples ~ Exp(mean).
>>> mean = 2.0
>>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2L)
>>> stats = x.stats()
>>> stats.count()
1000L
>>> abs(stats.mean() - mean) < 0.5
True
>>> from math import sqrt
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
True
"""
return callMLlibFunc("exponentialRDD", sc._jsc, float(mean), size, numPartitions, seed)
@staticmethod
def gammaRDD(sc, shape, scale, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the Gamma
distribution with the input shape and scale.
:param sc: SparkContext used to create the RDD.
:param shape: shape (> 0) parameter for the Gamma distribution
:param scale: scale (> 0) parameter for the Gamma distribution
:param size: Size of the RDD.
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
:param seed: Random seed (default: a random long integer).
:return: RDD of float comprised of i.i.d. samples ~ Gamma(shape, scale).
>>> from math import sqrt
>>> shape = 1.0
>>> scale = 2.0
>>> expMean = shape * scale
>>> expStd = sqrt(shape * scale * scale)
>>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2L)
>>> stats = x.stats()
>>> stats.count()
1000L
>>> abs(stats.mean() - expMean) < 0.5
True
>>> abs(stats.stdev() - expStd) < 0.5
True
"""
return callMLlibFunc("gammaRDD", sc._jsc, float(shape),
float(scale), size, numPartitions, seed)
@staticmethod
@toArray
def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
......@@ -175,6 +264,40 @@ class RandomRDDs(object):
"""
return callMLlibFunc("normalVectorRDD", sc._jsc, numRows, numCols, numPartitions, seed)
@staticmethod
@toArray
def logNormalVectorRDD(sc, mean, std, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the log normal distribution.
:param sc: SparkContext used to create the RDD.
:param mean: Mean of the log normal distribution
:param std: Standard Deviation of the log normal distribution
:param numRows: Number of Vectors in the RDD.
:param numCols: Number of elements in each Vector.
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
:param seed: Random seed (default: a random long integer).
:return: RDD of Vector with vectors containing i.i.d. samples ~ log `N(mean, std)`.
>>> import numpy as np
>>> from math import sqrt, exp
>>> mean = 0.0
>>> std = 1.0
>>> expMean = exp(mean + 0.5 * std * std)
>>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
>>> mat = np.matrix(RandomRDDs.logNormalVectorRDD(sc, mean, std, \
100, 100, seed=1L).collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - expMean) < 0.1
True
>>> abs(mat.std() - expStd) < 0.1
True
"""
return callMLlibFunc("logNormalVectorRDD", sc._jsc, float(mean), float(std),
numRows, numCols, numPartitions, seed)
@staticmethod
@toArray
def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
......@@ -205,6 +328,70 @@ class RandomRDDs(object):
return callMLlibFunc("poissonVectorRDD", sc._jsc, float(mean), numRows, numCols,
numPartitions, seed)
@staticmethod
@toArray
def exponentialVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the Exponential distribution with the input mean.
:param sc: SparkContext used to create the RDD.
:param mean: Mean, or 1 / lambda, for the Exponential distribution.
:param numRows: Number of Vectors in the RDD.
:param numCols: Number of elements in each Vector.
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
:param seed: Random seed (default: a random long integer).
:return: RDD of Vector with vectors containing i.i.d. samples ~ Exp(mean).
>>> import numpy as np
>>> mean = 0.5
>>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1L)
>>> mat = np.mat(rdd.collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - mean) < 0.5
True
>>> from math import sqrt
>>> abs(mat.std() - sqrt(mean)) < 0.5
True
"""
return callMLlibFunc("exponentialVectorRDD", sc._jsc, float(mean), numRows, numCols,
numPartitions, seed)
@staticmethod
@toArray
def gammaVectorRDD(sc, shape, scale, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
from the Gamma distribution.
:param sc: SparkContext used to create the RDD.
:param shape: Shape (> 0) of the Gamma distribution
:param scale: Scale (> 0) of the Gamma distribution
:param numRows: Number of Vectors in the RDD.
:param numCols: Number of elements in each Vector.
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
:param seed: Random seed (default: a random long integer).
:return: RDD of Vector with vectors containing i.i.d. samples ~ Gamma(shape, scale).
>>> import numpy as np
>>> from math import sqrt
>>> shape = 1.0
>>> scale = 2.0
>>> expMean = shape * scale
>>> expStd = sqrt(shape * scale * scale)
>>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, \
100, 100, seed=1L).collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - expMean) < 0.1
True
>>> abs(mat.std() - expStd) < 0.1
True
"""
return callMLlibFunc("gammaVectorRDD", sc._jsc, float(shape), float(scale),
numRows, numCols, numPartitions, seed)
def _test():
import doctest
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment