diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 1f5c746a3457cd6d85cac8c3b8eb02640466e548..60fb73f2b5be54b2acc9d6594a8439a6da55acbc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, BitSet} import scala.math.{abs, sqrt} import scala.util.Random import scala.util.Sorting +import scala.util.hashing.byteswap32 import com.esotericsoftware.kryo.Kryo import org.jblas.{DoubleMatrix, SimpleBlas, Solve} @@ -32,6 +33,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import org.apache.spark.serializer.KryoRegistrator import org.apache.spark.SparkContext._ +import org.apache.spark.util.Utils /** * Out-link information for a user or product block. This includes the original user/product IDs @@ -169,34 +171,39 @@ class ALS private ( this.numBlocks } - val partitioner = new HashPartitioner(numBlocks) + val partitioner = new Partitioner { + val numPartitions = numBlocks - val ratingsByUserBlock = ratings.map{ rating => (rating.user % numBlocks, rating) } + def getPartition(x: Any): Int = { + Utils.nonNegativeMod(byteswap32(x.asInstanceOf[Int]), numPartitions) + } + } + + val ratingsByUserBlock = ratings.map{ rating => + (partitioner.getPartition(rating.user), rating) + } val ratingsByProductBlock = ratings.map{ rating => - (rating.product % numBlocks, Rating(rating.product, rating.user, rating.rating)) + (partitioner.getPartition(rating.product), + Rating(rating.product, rating.user, rating.rating)) } - val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock) - val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock) + val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock, partitioner) + val (productInLinks, productOutLinks) = + makeLinkRDDs(numBlocks, ratingsByProductBlock, partitioner) // Initialize user and product factors randomly, but use a deterministic seed for each // partition so that fault recovery works val seedGen = new Random(seed) val seed1 = seedGen.nextInt() val seed2 = seedGen.nextInt() - // Hash an integer to propagate random bits at all positions, similar to java.util.HashTable - def hash(x: Int): Int = { - val r = x ^ (x >>> 20) ^ (x >>> 12) - r ^ (r >>> 7) ^ (r >>> 4) - } var users = userOutLinks.mapPartitionsWithIndex { (index, itr) => - val rand = new Random(hash(seed1 ^ index)) + val rand = new Random(byteswap32(seed1 ^ index)) itr.map { case (x, y) => (x, y.elementIds.map(_ => randomFactor(rank, rand))) } } var products = productOutLinks.mapPartitionsWithIndex { (index, itr) => - val rand = new Random(hash(seed2 ^ index)) + val rand = new Random(byteswap32(seed2 ^ index)) itr.map { case (x, y) => (x, y.elementIds.map(_ => randomFactor(rank, rand))) } @@ -327,13 +334,14 @@ class ALS private ( * Make the out-links table for a block of the users (or products) dataset given the list of * (user, product, rating) values for the users in that block (or the opposite for products). */ - private def makeOutLinkBlock(numBlocks: Int, ratings: Array[Rating]): OutLinkBlock = { + private def makeOutLinkBlock(numBlocks: Int, ratings: Array[Rating], + partitioner: Partitioner): OutLinkBlock = { val userIds = ratings.map(_.user).distinct.sorted val numUsers = userIds.length val userIdToPos = userIds.zipWithIndex.toMap val shouldSend = Array.fill(numUsers)(new BitSet(numBlocks)) for (r <- ratings) { - shouldSend(userIdToPos(r.user))(r.product % numBlocks) = true + shouldSend(userIdToPos(r.user))(partitioner.getPartition(r.product)) = true } OutLinkBlock(userIds, shouldSend) } @@ -342,14 +350,15 @@ class ALS private ( * Make the in-links table for a block of the users (or products) dataset given a list of * (user, product, rating) values for the users in that block (or the opposite for products). */ - private def makeInLinkBlock(numBlocks: Int, ratings: Array[Rating]): InLinkBlock = { + private def makeInLinkBlock(numBlocks: Int, ratings: Array[Rating], + partitioner: Partitioner): InLinkBlock = { val userIds = ratings.map(_.user).distinct.sorted val numUsers = userIds.length val userIdToPos = userIds.zipWithIndex.toMap // Split out our ratings by product block val blockRatings = Array.fill(numBlocks)(new ArrayBuffer[Rating]) for (r <- ratings) { - blockRatings(r.product % numBlocks) += r + blockRatings(partitioner.getPartition(r.product)) += r } val ratingsForBlock = new Array[Array[(Array[Int], Array[Double])]](numBlocks) for (productBlock <- 0 until numBlocks) { @@ -374,14 +383,14 @@ class ALS private ( * the users (or (blockId, (p, u, r)) for the products). We create these simultaneously to avoid * having to shuffle the (blockId, (u, p, r)) RDD twice, or to cache it. */ - private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, Rating)]) + private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, Rating)], partitioner: Partitioner) : (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) = { val grouped = ratings.partitionBy(new HashPartitioner(numBlocks)) val links = grouped.mapPartitionsWithIndex((blockId, elements) => { val ratings = elements.map{_._2}.toArray - val inLinkBlock = makeInLinkBlock(numBlocks, ratings) - val outLinkBlock = makeOutLinkBlock(numBlocks, ratings) + val inLinkBlock = makeInLinkBlock(numBlocks, ratings, partitioner) + val outLinkBlock = makeOutLinkBlock(numBlocks, ratings, partitioner) Iterator.single((blockId, (inLinkBlock, outLinkBlock))) }, true) val inLinks = links.mapValues(_._1) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 5aab9aba8f9c010b0ea2a71e2ed9a04f8e7c517d..4dfcd4b52ec66599ab68b2a2e5fa77acf5f050f8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -27,6 +27,7 @@ import org.jblas.DoubleMatrix import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.SparkContext._ +import org.apache.spark.Partitioner object ALSSuite { @@ -74,7 +75,6 @@ object ALSSuite { (sampledRatings, trueRatings, truePrefs) } - } @@ -128,6 +128,25 @@ class ALSSuite extends FunSuite with LocalSparkContext { assert(u11 != u2) } + test("negative ids") { + val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false) + val ratings = sc.parallelize(data._1.map { case Rating(u, p, r) => + Rating(u - 25, p - 25, r) + }) + val correct = data._2 + val model = ALS.train(ratings, 5, 15) + + val pairs = Array.tabulate(50, 50)((u, p) => (u - 25, p - 25)).flatten + val ans = model.predict(sc.parallelize(pairs)).collect() + ans.foreach { r => + val u = r.user + 25 + val p = r.product + 25 + val v = r.rating + val error = v - correct.get(u, p) + assert(math.abs(error) < 0.4) + } + } + /** * Test if we can correctly factorize R = U * P where U and P are of known rank. * @@ -140,16 +159,19 @@ class ALSSuite extends FunSuite with LocalSparkContext { * @param implicitPrefs flag to test implicit feedback * @param bulkPredict flag to test bulk prediciton * @param negativeWeights whether the generated data can contain negative values + * @param numBlocks number of blocks to partition users and products into */ def testALS(users: Int, products: Int, features: Int, iterations: Int, samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false, - bulkPredict: Boolean = false, negativeWeights: Boolean = false) + bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1) { val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products, features, samplingRate, implicitPrefs, negativeWeights) val model = implicitPrefs match { - case false => ALS.train(sc.parallelize(sampledRatings), features, iterations) - case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations) + case false => ALS.train(sc.parallelize(sampledRatings), features, iterations, 0.01, + numBlocks, 0L) + case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations, 0.01, + numBlocks, 1.0, 0L) } val predictedU = new DoubleMatrix(users, features)