diff --git a/LICENSE b/LICENSE index a7eee041129cbfc5e7580890ee5a71b976a6ae88..f1732fb47afc0a7ae79e98fa0a9fa1b677bd7627 100644 --- a/LICENSE +++ b/LICENSE @@ -712,18 +712,6 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -======================================================================== -For colt: -======================================================================== - -Copyright (c) 1999 CERN - European Organization for Nuclear Research. -Permission to use, copy, modify, distribute and sell this software and its documentation for any purpose is hereby granted without fee, provided that the above copyright notice appear in all copies and that both that copyright notice and this permission notice appear in supporting documentation. CERN makes no representations about the suitability of this software for any purpose. It is provided "as is" without expressed or implied warranty. - -Packages hep.aida.* - -Written by Pavel Binko, Dino Ferrero Merlino, Wolfgang Hoschek, Tony Johnson, Andreas Pfeiffer, and others. Check the FreeHEP home page for more info. Permission to use and/or redistribute this work is granted under the terms of the LGPL License, with the exception that any usage related to military applications is expressly forbidden. The software and documentation made available under the terms of this license are provided with no warranty. - - ======================================================================== For SnapTree: ======================================================================== diff --git a/assembly/pom.xml b/assembly/pom.xml index bfef95b8deb95b924926c6b7013150a66f3888c4..11d4bea9361ab5fdf17b43e41d40527b9b224006 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -146,6 +146,10 @@ <exclude>com/google/common/base/Present*</exclude> </excludes> </relocation> + <relocation> + <pattern>org.apache.commons.math3</pattern> + <shadedPattern>org.spark-project.commons.math3</shadedPattern> + </relocation> </relocations> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> diff --git a/core/pom.xml b/core/pom.xml index 320d1076f7c033becf6e0497ea27ea78de16010f..5cd21e18e8ca7af7f13746dc5cf47ad3ed8e5bae 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -85,8 +85,6 @@ <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-math3</artifactId> - <version>3.3</version> - <scope>test</scope> </dependency> <dependency> <groupId>com.google.code.findbugs</groupId> @@ -162,10 +160,6 @@ <artifactId>json4s-jackson_${scala.binary.version}</artifactId> <version>3.2.10</version> </dependency> - <dependency> - <groupId>colt</groupId> - <artifactId>colt</artifactId> - </dependency> <dependency> <groupId>org.apache.mesos</groupId> <artifactId>mesos</artifactId> diff --git a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala index 3155dfe165664ff9e006a058585dfef6d20e1417..637492a97551b4370af4a78ddf4fae13423cc4b7 100644 --- a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala @@ -17,7 +17,7 @@ package org.apache.spark.partial -import cern.jet.stat.Probability +import org.apache.commons.math3.distribution.NormalDistribution /** * An ApproximateEvaluator for counts. @@ -46,7 +46,8 @@ private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double) val mean = (sum + 1 - p) / p val variance = (sum + 1) * (1 - p) / (p * p) val stdev = math.sqrt(variance) - val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2) + val confFactor = new NormalDistribution(). + inverseCumulativeProbability(1 - (1 - confidence) / 2) val low = mean - confFactor * stdev val high = mean + confFactor * stdev new BoundedDouble(mean, confidence, low, high) diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala index 8bb78123e3c9c6f7acf26cfa42e08c5f79e2e965..3ef3cc219dec6b614f80b4ab19eb8e03ea650296 100644 --- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala @@ -24,7 +24,7 @@ import scala.collection.Map import scala.collection.mutable.HashMap import scala.reflect.ClassTag -import cern.jet.stat.Probability +import org.apache.commons.math3.distribution.NormalDistribution import org.apache.spark.util.collection.OpenHashMap @@ -55,7 +55,8 @@ private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, conf new HashMap[T, BoundedDouble] } else { val p = outputsMerged.toDouble / totalOutputs - val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2) + val confFactor = new NormalDistribution(). + inverseCumulativeProbability(1 - (1 - confidence) / 2) val result = new JHashMap[T, BoundedDouble](sums.size) sums.foreach { case (key, sum) => val mean = (sum + 1 - p) / p diff --git a/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala index d24959cba8727016c23eb87d7444d44e79f00ce9..787a21a61fdcf2088f8a4c9c2d094247a8e715d6 100644 --- a/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala @@ -17,7 +17,7 @@ package org.apache.spark.partial -import cern.jet.stat.Probability +import org.apache.commons.math3.distribution.{NormalDistribution, TDistribution} import org.apache.spark.util.StatCounter @@ -45,9 +45,10 @@ private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double) val stdev = math.sqrt(counter.sampleVariance / counter.count) val confFactor = { if (counter.count > 100) { - Probability.normalInverse(1 - (1 - confidence) / 2) + new NormalDistribution().inverseCumulativeProbability(1 - (1 - confidence) / 2) } else { - Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt) + val degreesOfFreedom = (counter.count - 1).toInt + new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - (1 - confidence) / 2) } } val low = mean - confFactor * stdev diff --git a/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala b/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala index 92915ee66d29f3a2100a6aa3ffb12fa435dffd74..828bf96c2c0bd1b353811941ff222f8b7ff9fbc2 100644 --- a/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala +++ b/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala @@ -17,7 +17,7 @@ package org.apache.spark.partial -import cern.jet.stat.Probability +import org.apache.commons.math3.distribution.{TDistribution, NormalDistribution} /** * A utility class for caching Student's T distribution values for a given confidence level @@ -25,8 +25,10 @@ import cern.jet.stat.Probability * confidence intervals for many keys. */ private[spark] class StudentTCacher(confidence: Double) { + val NORMAL_APPROX_SAMPLE_SIZE = 100 // For samples bigger than this, use Gaussian approximation - val normalApprox = Probability.normalInverse(1 - (1 - confidence) / 2) + + val normalApprox = new NormalDistribution().inverseCumulativeProbability(1 - (1 - confidence) / 2) val cache = Array.fill[Double](NORMAL_APPROX_SAMPLE_SIZE)(-1.0) def get(sampleSize: Long): Double = { @@ -35,7 +37,8 @@ private[spark] class StudentTCacher(confidence: Double) { } else { val size = sampleSize.toInt if (cache(size) < 0) { - cache(size) = Probability.studentTInverse(1 - confidence, size - 1) + val tDist = new TDistribution(size - 1) + cache(size) = tDist.inverseCumulativeProbability(1 - (1 - confidence) / 2) } cache(size) } diff --git a/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala index d5336284571d2b33dbe6f92a518a16760df6ae04..1753c2561b678353dbebeb234058d169e26bc122 100644 --- a/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala @@ -17,7 +17,7 @@ package org.apache.spark.partial -import cern.jet.stat.Probability +import org.apache.commons.math3.distribution.{TDistribution, NormalDistribution} import org.apache.spark.util.StatCounter @@ -55,9 +55,10 @@ private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double) val sumStdev = math.sqrt(sumVar) val confFactor = { if (counter.count > 100) { - Probability.normalInverse(1 - (1 - confidence) / 2) + new NormalDistribution().inverseCumulativeProbability(1 - (1 - confidence) / 2) } else { - Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt) + val degreesOfFreedom = (counter.count - 1).toInt + new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - (1 - confidence) / 2) } } val low = sumEstimate - confFactor * sumStdev diff --git a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala index b097c30f8c231b9fbf9df797ef31387cd3af7bdf..9e8cee5331cf8a8cde0074290ef890d1cbf51998 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala @@ -21,8 +21,7 @@ import java.util.Random import scala.reflect.ClassTag -import cern.jet.random.Poisson -import cern.jet.random.engine.DRand +import org.apache.commons.math3.distribution.PoissonDistribution import org.apache.spark.{Partition, TaskContext} @@ -53,9 +52,11 @@ private[spark] class SampledRDD[T: ClassTag]( if (withReplacement) { // For large datasets, the expected number of occurrences of each element in a sample with // replacement is Poisson(frac). We use that to get a count for each element. - val poisson = new Poisson(frac, new DRand(split.seed)) + val poisson = new PoissonDistribution(frac) + poisson.reseedRandomGenerator(split.seed) + firstParent[T].iterator(split.prev, context).flatMap { element => - val count = poisson.nextInt() + val count = poisson.sample() if (count == 0) { Iterator.empty // Avoid object allocation when we return 0 items, which is quite often } else { diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala index 32c5fdad75e582db661b3a1083bdcb375eaa753a..ee389def20c8cb744443837fe2b8c172dca9f74d 100644 --- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala +++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala @@ -19,8 +19,7 @@ package org.apache.spark.util.random import java.util.Random -import cern.jet.random.Poisson -import cern.jet.random.engine.DRand +import org.apache.commons.math3.distribution.PoissonDistribution import org.apache.spark.annotation.DeveloperApi @@ -87,15 +86,16 @@ class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false) @DeveloperApi class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] { - private[random] var rng = new Poisson(mean, new DRand) + private[random] var rng = new PoissonDistribution(mean) override def setSeed(seed: Long) { - rng = new Poisson(mean, new DRand(seed.toInt)) + rng = new PoissonDistribution(mean) + rng.reseedRandomGenerator(seed) } override def sample(items: Iterator[T]): Iterator[T] = { items.flatMap { item => - val count = rng.nextInt() + val count = rng.sample() if (count == 0) { Iterator.empty } else { diff --git a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala index 8f95d7c6b799b776a42f7c9b59aee9d2e9fd2528..4fa357edd6f0727beaffb9f4bd84e531284052e8 100644 --- a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala @@ -22,8 +22,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import cern.jet.random.Poisson -import cern.jet.random.engine.DRand +import org.apache.commons.math3.distribution.PoissonDistribution import org.apache.spark.Logging import org.apache.spark.SparkContext._ @@ -209,7 +208,7 @@ private[spark] object StratifiedSamplingUtils extends Logging { samplingRateByKey = computeThresholdByKey(finalResult, fractions) } (idx: Int, iter: Iterator[(K, V)]) => { - val rng = new RandomDataGenerator + val rng = new RandomDataGenerator() rng.reSeed(seed + idx) // Must use the same invoke pattern on the rng as in getSeqOp for without replacement // in order to generate the same sequence of random numbers when creating the sample @@ -245,9 +244,9 @@ private[spark] object StratifiedSamplingUtils extends Logging { // Must use the same invoke pattern on the rng as in getSeqOp for with replacement // in order to generate the same sequence of random numbers when creating the sample val copiesAccepted = if (acceptBound == 0) 0L else rng.nextPoisson(acceptBound) - val copiesWailisted = rng.nextPoisson(finalResult(key).waitListBound) + val copiesWaitlisted = rng.nextPoisson(finalResult(key).waitListBound) val copiesInSample = copiesAccepted + - (0 until copiesWailisted).count(i => rng.nextUniform() < thresholdByKey(key)) + (0 until copiesWaitlisted).count(i => rng.nextUniform() < thresholdByKey(key)) if (copiesInSample > 0) { Iterator.fill(copiesInSample.toInt)(item) } else { @@ -261,10 +260,10 @@ private[spark] object StratifiedSamplingUtils extends Logging { rng.reSeed(seed + idx) iter.flatMap { item => val count = rng.nextPoisson(fractions(item._1)) - if (count > 0) { - Iterator.fill(count)(item) - } else { + if (count == 0) { Iterator.empty + } else { + Iterator.fill(count)(item) } } } @@ -274,15 +273,24 @@ private[spark] object StratifiedSamplingUtils extends Logging { /** A random data generator that generates both uniform values and Poisson values. */ private class RandomDataGenerator { val uniform = new XORShiftRandom() - var poisson = new Poisson(1.0, new DRand) + // commons-math3 doesn't have a method to generate Poisson from an arbitrary mean; + // maintain a cache of Poisson(m) distributions for various m + val poissonCache = mutable.Map[Double, PoissonDistribution]() + var poissonSeed = 0L - def reSeed(seed: Long) { + def reSeed(seed: Long): Unit = { uniform.setSeed(seed) - poisson = new Poisson(1.0, new DRand(seed.toInt)) + poissonSeed = seed + poissonCache.clear() } def nextPoisson(mean: Double): Int = { - poisson.nextInt(mean) + val poisson = poissonCache.getOrElseUpdate(mean, { + val newPoisson = new PoissonDistribution(mean) + newPoisson.reseedRandomGenerator(poissonSeed) + newPoisson + }) + poisson.sample() } def nextUniform(): Double = { diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 814e40c4f77cc8a0535ad804ffd88a1666ae998e..0172876a264b87c4ee331ef92a02d337b148dfd4 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -142,7 +142,7 @@ public class JavaAPISuite implements Serializable { JavaRDD<Integer> rdd = sc.parallelize(ints); JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 11); // expected 2 but of course result varies randomly a bit - Assert.assertEquals(3, sample20.count()); + Assert.assertEquals(1, sample20.count()); JavaRDD<Integer> sample20NoReplacement = rdd.sample(false, 0.2, 11); Assert.assertEquals(2, sample20NoReplacement.count()); } diff --git a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala index 36877476e708e72800ca67cdf424c4606d74ee47..ba67d766a775a25400128945b93ad2b9558f9532 100644 --- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala @@ -19,7 +19,8 @@ package org.apache.spark.util.random import java.util.Random -import cern.jet.random.Poisson +import org.apache.commons.math3.distribution.PoissonDistribution + import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.mock.EasyMockSugar @@ -28,11 +29,11 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9) var random: Random = _ - var poisson: Poisson = _ + var poisson: PoissonDistribution = _ before { random = mock[Random] - poisson = mock[Poisson] + poisson = mock[PoissonDistribution] } test("BernoulliSamplerWithRange") { @@ -101,7 +102,7 @@ class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar test("PoissonSampler") { expecting { for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) { - poisson.nextInt().andReturn(x) + poisson.sample().andReturn(x) } } whenExecuting(poisson) { diff --git a/examples/pom.xml b/examples/pom.xml index eb49a0e5af22d1e60cbfeb02c92076ba9ac84acc..bc3291803c3245456d76c42887980d9800883692 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -156,6 +156,10 @@ <artifactId>algebird-core_${scala.binary.version}</artifactId> <version>0.1.11</version> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> @@ -268,6 +272,10 @@ <exclude>com.google.common.base.Optional**</exclude> </excludes> </relocation> + <relocation> + <pattern>org.apache.commons.math3</pattern> + <shadedPattern>org.spark-project.commons.math3</shadedPattern> + </relocation> </relocations> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala index 1f576319b3ca8cf56626bbc07c7d61d4fcb6a2e3..3d5259463003dbdabecbcd39d5817d6016d1718c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala @@ -17,11 +17,7 @@ package org.apache.spark.examples -import scala.math.sqrt - -import cern.colt.matrix._ -import cern.colt.matrix.linalg._ -import cern.jet.math._ +import org.apache.commons.math3.linear._ /** * Alternating least squares matrix factorization. @@ -30,84 +26,70 @@ import cern.jet.math._ * please refer to org.apache.spark.mllib.recommendation.ALS */ object LocalALS { + // Parameters set through command line arguments var M = 0 // Number of movies var U = 0 // Number of users var F = 0 // Number of features var ITERATIONS = 0 - val LAMBDA = 0.01 // Regularization coefficient - // Some COLT objects - val factory2D = DoubleFactory2D.dense - val factory1D = DoubleFactory1D.dense - val algebra = Algebra.DEFAULT - val blas = SeqBlas.seqBlas - - def generateR(): DoubleMatrix2D = { - val mh = factory2D.random(M, F) - val uh = factory2D.random(U, F) - algebra.mult(mh, algebra.transpose(uh)) + def generateR(): RealMatrix = { + val mh = randomMatrix(M, F) + val uh = randomMatrix(U, F) + mh.multiply(uh.transpose()) } - def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], - us: Array[DoubleMatrix1D]): Double = - { - val r = factory2D.make(M, U) + def rmse(targetR: RealMatrix, ms: Array[RealVector], us: Array[RealVector]): Double = { + val r = new Array2DRowRealMatrix(M, U) for (i <- 0 until M; j <- 0 until U) { - r.set(i, j, blas.ddot(ms(i), us(j))) + r.setEntry(i, j, ms(i).dotProduct(us(j))) } - blas.daxpy(-1, targetR, r) - val sumSqs = r.aggregate(Functions.plus, Functions.square) - sqrt(sumSqs / (M * U)) + val diffs = r.subtract(targetR) + var sumSqs = 0.0 + for (i <- 0 until M; j <- 0 until U) { + val diff = diffs.getEntry(i, j) + sumSqs += diff * diff + } + math.sqrt(sumSqs / (M.toDouble * U.toDouble)) } - def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], - R: DoubleMatrix2D) : DoubleMatrix1D = - { - val XtX = factory2D.make(F, F) - val Xty = factory1D.make(F) + def updateMovie(i: Int, m: RealVector, us: Array[RealVector], R: RealMatrix) : RealVector = { + var XtX: RealMatrix = new Array2DRowRealMatrix(F, F) + var Xty: RealVector = new ArrayRealVector(F) // For each user that rated the movie for (j <- 0 until U) { val u = us(j) // Add u * u^t to XtX - blas.dger(1, u, u, XtX) + XtX = XtX.add(u.outerProduct(u)) // Add u * rating to Xty - blas.daxpy(R.get(i, j), u, Xty) + Xty = Xty.add(u.mapMultiply(R.getEntry(i, j))) } - // Add regularization coefs to diagonal terms + // Add regularization coefficients to diagonal terms for (d <- 0 until F) { - XtX.set(d, d, XtX.get(d, d) + LAMBDA * U) + XtX.addToEntry(d, d, LAMBDA * U) } // Solve it with Cholesky - val ch = new CholeskyDecomposition(XtX) - val Xty2D = factory2D.make(Xty.toArray, F) - val solved2D = ch.solve(Xty2D) - solved2D.viewColumn(0) + new CholeskyDecomposition(XtX).getSolver.solve(Xty) } - def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D], - R: DoubleMatrix2D) : DoubleMatrix1D = - { - val XtX = factory2D.make(F, F) - val Xty = factory1D.make(F) + def updateUser(j: Int, u: RealVector, ms: Array[RealVector], R: RealMatrix) : RealVector = { + var XtX: RealMatrix = new Array2DRowRealMatrix(F, F) + var Xty: RealVector = new ArrayRealVector(F) // For each movie that the user rated for (i <- 0 until M) { val m = ms(i) // Add m * m^t to XtX - blas.dger(1, m, m, XtX) + XtX = XtX.add(m.outerProduct(m)) // Add m * rating to Xty - blas.daxpy(R.get(i, j), m, Xty) + Xty = Xty.add(m.mapMultiply(R.getEntry(i, j))) } - // Add regularization coefs to diagonal terms + // Add regularization coefficients to diagonal terms for (d <- 0 until F) { - XtX.set(d, d, XtX.get(d, d) + LAMBDA * M) + XtX.addToEntry(d, d, LAMBDA * M) } // Solve it with Cholesky - val ch = new CholeskyDecomposition(XtX) - val Xty2D = factory2D.make(Xty.toArray, F) - val solved2D = ch.solve(Xty2D) - solved2D.viewColumn(0) + new CholeskyDecomposition(XtX).getSolver.solve(Xty) } def showWarning() { @@ -135,21 +117,28 @@ object LocalALS { showWarning() - printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS) + println(s"Running with M=$M, U=$U, F=$F, iters=$ITERATIONS") val R = generateR() // Initialize m and u randomly - var ms = Array.fill(M)(factory1D.random(F)) - var us = Array.fill(U)(factory1D.random(F)) + var ms = Array.fill(M)(randomVector(F)) + var us = Array.fill(U)(randomVector(F)) // Iteratively update movies then users for (iter <- 1 to ITERATIONS) { - println("Iteration " + iter + ":") + println(s"Iteration $iter:") ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray println("RMSE = " + rmse(R, ms, us)) println() } } + + private def randomVector(n: Int): RealVector = + new ArrayRealVector(Array.fill(n)(math.random)) + + private def randomMatrix(rows: Int, cols: Int): RealMatrix = + new Array2DRowRealMatrix(Array.fill(rows, cols)(math.random)) + } diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala index fde8ffeedf8b43c634e75e20c1f0475542f10d6a..6c0ac8013ce34c7b05e5616d72ffe111ee3314b6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala @@ -17,11 +17,7 @@ package org.apache.spark.examples -import scala.math.sqrt - -import cern.colt.matrix._ -import cern.colt.matrix.linalg._ -import cern.jet.math._ +import org.apache.commons.math3.linear._ import org.apache.spark._ @@ -32,62 +28,53 @@ import org.apache.spark._ * please refer to org.apache.spark.mllib.recommendation.ALS */ object SparkALS { + // Parameters set through command line arguments var M = 0 // Number of movies var U = 0 // Number of users var F = 0 // Number of features var ITERATIONS = 0 - val LAMBDA = 0.01 // Regularization coefficient - // Some COLT objects - val factory2D = DoubleFactory2D.dense - val factory1D = DoubleFactory1D.dense - val algebra = Algebra.DEFAULT - val blas = SeqBlas.seqBlas - - def generateR(): DoubleMatrix2D = { - val mh = factory2D.random(M, F) - val uh = factory2D.random(U, F) - algebra.mult(mh, algebra.transpose(uh)) + def generateR(): RealMatrix = { + val mh = randomMatrix(M, F) + val uh = randomMatrix(U, F) + mh.multiply(uh.transpose()) } - def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], - us: Array[DoubleMatrix1D]): Double = - { - val r = factory2D.make(M, U) + def rmse(targetR: RealMatrix, ms: Array[RealVector], us: Array[RealVector]): Double = { + val r = new Array2DRowRealMatrix(M, U) for (i <- 0 until M; j <- 0 until U) { - r.set(i, j, blas.ddot(ms(i), us(j))) + r.setEntry(i, j, ms(i).dotProduct(us(j))) } - blas.daxpy(-1, targetR, r) - val sumSqs = r.aggregate(Functions.plus, Functions.square) - sqrt(sumSqs / (M * U)) + val diffs = r.subtract(targetR) + var sumSqs = 0.0 + for (i <- 0 until M; j <- 0 until U) { + val diff = diffs.getEntry(i, j) + sumSqs += diff * diff + } + math.sqrt(sumSqs / (M.toDouble * U.toDouble)) } - def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], - R: DoubleMatrix2D) : DoubleMatrix1D = - { + def update(i: Int, m: RealVector, us: Array[RealVector], R: RealMatrix) : RealVector = { val U = us.size - val F = us(0).size - val XtX = factory2D.make(F, F) - val Xty = factory1D.make(F) + val F = us(0).getDimension + var XtX: RealMatrix = new Array2DRowRealMatrix(F, F) + var Xty: RealVector = new ArrayRealVector(F) // For each user that rated the movie for (j <- 0 until U) { val u = us(j) // Add u * u^t to XtX - blas.dger(1, u, u, XtX) + XtX = XtX.add(u.outerProduct(u)) // Add u * rating to Xty - blas.daxpy(R.get(i, j), u, Xty) + Xty = Xty.add(u.mapMultiply(R.getEntry(i, j))) } // Add regularization coefs to diagonal terms for (d <- 0 until F) { - XtX.set(d, d, XtX.get(d, d) + LAMBDA * U) + XtX.addToEntry(d, d, LAMBDA * U) } // Solve it with Cholesky - val ch = new CholeskyDecomposition(XtX) - val Xty2D = factory2D.make(Xty.toArray, F) - val solved2D = ch.solve(Xty2D) - solved2D.viewColumn(0) + new CholeskyDecomposition(XtX).getSolver.solve(Xty) } def showWarning() { @@ -118,7 +105,7 @@ object SparkALS { showWarning() - printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS) + println(s"Running with M=$M, U=$U, F=$F, iters=$ITERATIONS") val sparkConf = new SparkConf().setAppName("SparkALS") val sc = new SparkContext(sparkConf) @@ -126,21 +113,21 @@ object SparkALS { val R = generateR() // Initialize m and u randomly - var ms = Array.fill(M)(factory1D.random(F)) - var us = Array.fill(U)(factory1D.random(F)) + var ms = Array.fill(M)(randomVector(F)) + var us = Array.fill(U)(randomVector(F)) // Iteratively update movies then users val Rc = sc.broadcast(R) var msb = sc.broadcast(ms) var usb = sc.broadcast(us) for (iter <- 1 to ITERATIONS) { - println("Iteration " + iter + ":") + println(s"Iteration $iter:") ms = sc.parallelize(0 until M, slices) .map(i => update(i, msb.value(i), usb.value, Rc.value)) .collect() msb = sc.broadcast(ms) // Re-broadcast ms because it was updated us = sc.parallelize(0 until U, slices) - .map(i => update(i, usb.value(i), msb.value, algebra.transpose(Rc.value))) + .map(i => update(i, usb.value(i), msb.value, Rc.value.transpose())) .collect() usb = sc.broadcast(us) // Re-broadcast us because it was updated println("RMSE = " + rmse(R, ms, us)) @@ -149,4 +136,11 @@ object SparkALS { sc.stop() } + + private def randomVector(n: Int): RealVector = + new ArrayRealVector(Array.fill(n)(math.random)) + + private def randomMatrix(rows: Int, cols: Int): RealMatrix = + new Array2DRowRealMatrix(Array.fill(rows, cols)(math.random)) + } diff --git a/mllib/pom.xml b/mllib/pom.xml index 696e9396f627c290da994f1e5e0df2c7fd192f82..de062a490159690895e73d2bf76f0d8b6cecd5ae 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -65,12 +65,12 @@ <groupId>junit</groupId> <artifactId>junit</artifactId> </exclusion> - <exclusion> - <groupId>org.apache.commons</groupId> - <artifactId>commons-math3</artifactId> - </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala index 28179fbc450c031ea711625bec820ec4388b1716..51f9b8657c6406951e206b7f3a8102f1a8ea3cd8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala @@ -17,8 +17,7 @@ package org.apache.spark.mllib.random -import cern.jet.random.Poisson -import cern.jet.random.engine.DRand +import org.apache.commons.math3.distribution.PoissonDistribution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom} @@ -89,12 +88,13 @@ class StandardNormalGenerator extends RandomDataGenerator[Double] { @DeveloperApi class PoissonGenerator(val mean: Double) extends RandomDataGenerator[Double] { - private var rng = new Poisson(mean, new DRand) + private var rng = new PoissonDistribution(mean) - override def nextValue(): Double = rng.nextDouble() + override def nextValue(): Double = rng.sample() override def setSeed(seed: Long) { - rng = new Poisson(mean, new DRand(seed.toInt)) + rng = new PoissonDistribution(mean) + rng.reseedRandomGenerator(seed) } override def copy(): PoissonGenerator = new PoissonGenerator(mean) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala index 0089419c2c5d478fc96b0b463438bf5a13fa6534..ea82d39b72c03e38e659f4e0209c1c2cd383e3c4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.stat.test import breeze.linalg.{DenseMatrix => BDM} -import cern.jet.stat.Probability.chiSquareComplemented +import org.apache.commons.math3.distribution.ChiSquaredDistribution import org.apache.spark.{SparkException, Logging} import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors} @@ -33,7 +33,7 @@ import scala.collection.mutable * on an input of type `Matrix` in which independence between columns is assessed. * We also provide a method for computing the chi-squared statistic between each feature and the * label for an input `RDD[LabeledPoint]`, return an `Array[ChiSquaredTestResult]` of size = - * number of features in the inpuy RDD. + * number of features in the input RDD. * * Supported methods for goodness of fit: `pearson` (default) * Supported methods for independence: `pearson` (default) @@ -139,7 +139,7 @@ private[stat] object ChiSqTest extends Logging { } /* - * Pearon's goodness of fit test on the input observed and expected counts/relative frequencies. + * Pearson's goodness of fit test on the input observed and expected counts/relative frequencies. * Uniform distribution is assumed when `expected` is not passed in. */ def chiSquared(observed: Vector, @@ -188,12 +188,12 @@ private[stat] object ChiSqTest extends Logging { } } val df = size - 1 - val pValue = chiSquareComplemented(df, statistic) + val pValue = 1.0 - new ChiSquaredDistribution(df).cumulativeProbability(statistic) new ChiSqTestResult(pValue, df, statistic, PEARSON.name, NullHypothesis.goodnessOfFit.toString) } /* - * Pearon's independence test on the input contingency matrix. + * Pearson's independence test on the input contingency matrix. * TODO: optimize for SparseMatrix when it becomes supported. */ def chiSquaredMatrix(counts: Matrix, methodName:String = PEARSON.name): ChiSqTestResult = { @@ -238,7 +238,13 @@ private[stat] object ChiSqTest extends Logging { j += 1 } val df = (numCols - 1) * (numRows - 1) - val pValue = chiSquareComplemented(df, statistic) - new ChiSqTestResult(pValue, df, statistic, methodName, NullHypothesis.independence.toString) + if (df == 0) { + // 1 column or 1 row. Constant distribution is independent of anything. + // pValue = 1.0 and statistic = 0.0 in this case. + new ChiSqTestResult(1.0, 0, 0.0, methodName, NullHypothesis.independence.toString) + } else { + val pValue = 1.0 - new ChiSquaredDistribution(df).cumulativeProbability(statistic) + new ChiSqTestResult(pValue, df, statistic, methodName, NullHypothesis.independence.toString) + } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/BaggedPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/BaggedPoint.scala index 937c8a2ac5836654febea642514e88b23b2ab3be..e7a2127c5d2e7ab011be7f6d0fdd2e5662e83c1e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/BaggedPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impl/BaggedPoint.scala @@ -17,8 +17,7 @@ package org.apache.spark.mllib.tree.impl -import cern.jet.random.Poisson -import cern.jet.random.engine.DRand +import org.apache.commons.math3.distribution.PoissonDistribution import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -60,12 +59,13 @@ private[tree] object BaggedPoint { input.mapPartitionsWithIndex { (partitionIndex, instances) => // TODO: Support different sampling rates, and sampling without replacement. // Use random seed = seed + partitionIndex + 1 to make generation reproducible. - val poisson = new Poisson(1.0, new DRand(seed + partitionIndex + 1)) + val poisson = new PoissonDistribution(1.0) + poisson.reseedRandomGenerator(seed + partitionIndex + 1) instances.map { instance => val subsampleWeights = new Array[Double](numSubsamples) var subsampleIndex = 0 while (subsampleIndex < numSubsamples) { - subsampleWeights(subsampleIndex) = poisson.nextInt() + subsampleWeights(subsampleIndex) = poisson.sample() subsampleIndex += 1 } new BaggedPoint(instance, subsampleWeights) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/RandomForestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/RandomForestSuite.scala index 6b13765b98f41caf188dae9c7c687cd344bfef98..d3eff59aa0409ad3985ed5af7647eaf3c7c585d3 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/RandomForestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/RandomForestSuite.scala @@ -187,7 +187,7 @@ class RandomForestSuite extends FunSuite with LocalSparkContext { numClassesForClassification = 3, categoricalFeaturesInfo = categoricalFeaturesInfo) val model = RandomForest.trainClassifier(input, strategy, numTrees = 2, featureSubsetStrategy = "sqrt", seed = 12345) - RandomForestSuite.validateClassifier(model, arr, 1.0) + RandomForestSuite.validateClassifier(model, arr, 0.0) } } diff --git a/pom.xml b/pom.xml index 2ebe1b8da588a36f433c8b70adfdd14b1364a01e..abcb97108c5d907bf99b7358c23118f111f50b15 100644 --- a/pom.xml +++ b/pom.xml @@ -305,7 +305,6 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-math3</artifactId> <version>3.3</version> - <scope>test</scope> </dependency> <dependency> <groupId>com.google.code.findbugs</groupId> @@ -431,11 +430,6 @@ <artifactId>akka-testkit_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> - <dependency> - <groupId>colt</groupId> - <artifactId>colt</artifactId> - <version>1.2.0</version> - </dependency> <dependency> <groupId>org.apache.mesos</groupId> <artifactId>mesos</artifactId> diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py index 73baba4ace5f6588c72b76c358119fbcb8f2cfeb..2202c51ab9c06226a82f72c1a3cf6935807e6e42 100644 --- a/python/pyspark/mllib/random.py +++ b/python/pyspark/mllib/random.py @@ -107,7 +107,7 @@ class RandomRDDs(object): distribution with the input mean. >>> mean = 100.0 - >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=1L) + >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2L) >>> stats = x.stats() >>> stats.count() 1000L