From e43139f40309995b1133c7ef2936ab858b7b44fc Mon Sep 17 00:00:00 2001 From: Xiangrui Meng <meng@databricks.com> Date: Wed, 25 Feb 2015 23:43:29 -0800 Subject: [PATCH] [SPARK-5976][MLLIB] Add partitioner to factors returned by ALS The model trained by ALS requires partitioning information to do quick lookup of a user/item factor for making recommendation on individual requests. In the new implementation, we didn't set partitioners in the factors returned by ALS, which would cause performance regression. srowen coderxiang Author: Xiangrui Meng <meng@databricks.com> Closes #4748 from mengxr/SPARK-5976 and squashes the following commits: 9373a09 [Xiangrui Meng] add partitioner to factors returned by ALS 260f183 [Xiangrui Meng] add a test for partitioner --- .../apache/spark/ml/recommendation/ALS.scala | 55 +++++++++++-------- .../spark/ml/recommendation/ALSSuite.scala | 32 ++++++++++- 2 files changed, 64 insertions(+), 23 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index c2ec716f08..7bb69df653 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -29,7 +29,7 @@ import com.github.fommil.netlib.LAPACK.{getInstance => lapack} import org.jblas.DoubleMatrix import org.netlib.util.intW -import org.apache.spark.{HashPartitioner, Logging, Partitioner} +import org.apache.spark.{Logging, Partitioner} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.param._ @@ -501,8 +501,8 @@ object ALS extends Logging { require(intermediateRDDStorageLevel != StorageLevel.NONE, "ALS is not designed to run without persisting intermediate RDDs.") val sc = ratings.sparkContext - val userPart = new HashPartitioner(numUserBlocks) - val itemPart = new HashPartitioner(numItemBlocks) + val userPart = new ALSPartitioner(numUserBlocks) + val itemPart = new ALSPartitioner(numItemBlocks) val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions) val solver = if (nonnegative) new NNLSSolver else new CholeskySolver @@ -550,13 +550,23 @@ object ALS extends Logging { val userIdAndFactors = userInBlocks .mapValues(_.srcIds) .join(userFactors) - .values + .mapPartitions({ items => + items.flatMap { case (_, (ids, factors)) => + ids.view.zip(factors) + } + // Preserve the partitioning because IDs are consistent with the partitioners in userInBlocks + // and userFactors. + }, preservesPartitioning = true) .setName("userFactors") .persist(finalRDDStorageLevel) val itemIdAndFactors = itemInBlocks .mapValues(_.srcIds) .join(itemFactors) - .values + .mapPartitions({ items => + items.flatMap { case (_, (ids, factors)) => + ids.view.zip(factors) + } + }, preservesPartitioning = true) .setName("itemFactors") .persist(finalRDDStorageLevel) if (finalRDDStorageLevel != StorageLevel.NONE) { @@ -569,13 +579,7 @@ object ALS extends Logging { itemOutBlocks.unpersist() blockRatings.unpersist() } - val userOutput = userIdAndFactors.flatMap { case (ids, factors) => - ids.view.zip(factors) - } - val itemOutput = itemIdAndFactors.flatMap { case (ids, factors) => - ids.view.zip(factors) - } - (userOutput, itemOutput) + (userIdAndFactors, itemIdAndFactors) } /** @@ -995,15 +999,15 @@ object ALS extends Logging { "Converting to local indices took " + (System.nanoTime() - start) / 1e9 + " seconds.") val dstLocalIndices = dstIds.map(dstIdToLocalIndex.apply) (srcBlockId, (dstBlockId, srcIds, dstLocalIndices, ratings)) - }.groupByKey(new HashPartitioner(srcPart.numPartitions)) - .mapValues { iter => - val builder = - new UncompressedInBlockBuilder[ID](new LocalIndexEncoder(dstPart.numPartitions)) - iter.foreach { case (dstBlockId, srcIds, dstLocalIndices, ratings) => - builder.add(dstBlockId, srcIds, dstLocalIndices, ratings) - } - builder.build().compress() - }.setName(prefix + "InBlocks") + }.groupByKey(new ALSPartitioner(srcPart.numPartitions)) + .mapValues { iter => + val builder = + new UncompressedInBlockBuilder[ID](new LocalIndexEncoder(dstPart.numPartitions)) + iter.foreach { case (dstBlockId, srcIds, dstLocalIndices, ratings) => + builder.add(dstBlockId, srcIds, dstLocalIndices, ratings) + } + builder.build().compress() + }.setName(prefix + "InBlocks") .persist(storageLevel) val outBlocks = inBlocks.mapValues { case InBlock(srcIds, dstPtrs, dstEncodedIndices, _) => val encoder = new LocalIndexEncoder(dstPart.numPartitions) @@ -1064,7 +1068,7 @@ object ALS extends Logging { (dstBlockId, (srcBlockId, activeIndices.map(idx => srcFactors(idx)))) } } - val merged = srcOut.groupByKey(new HashPartitioner(dstInBlocks.partitions.length)) + val merged = srcOut.groupByKey(new ALSPartitioner(dstInBlocks.partitions.length)) dstInBlocks.join(merged).mapValues { case (InBlock(dstIds, srcPtrs, srcEncodedIndices, ratings), srcFactors) => val sortedSrcFactors = new Array[FactorBlock](numSrcBlocks) @@ -1149,4 +1153,11 @@ object ALS extends Logging { encoded & localIndexMask } } + + /** + * Partitioner used by ALS. We requires that getPartition is a projection. That is, for any key k, + * we have getPartition(getPartition(k)) = getPartition(k). Since the the default HashPartitioner + * satisfies this requirement, we simply use a type alias here. + */ + private[recommendation] type ALSPartitioner = org.apache.spark.HashPartitioner } diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 376c3626f9..bb86bafc0e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer import com.github.fommil.netlib.BLAS.{getInstance => blas} import org.scalatest.FunSuite -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkException} import org.apache.spark.ml.recommendation.ALS._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -455,4 +455,34 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext with Logging { assert(isNonnegative(itemFactors)) // TODO: Validate the solution. } + + test("als partitioner is a projection") { + for (p <- Seq(1, 10, 100, 1000)) { + val part = new ALSPartitioner(p) + var k = 0 + while (k < p) { + assert(k === part.getPartition(k)) + assert(k === part.getPartition(k.toLong)) + k += 1 + } + } + } + + test("partitioner in returned factors") { + val (ratings, _) = genImplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01) + val (userFactors, itemFactors) = ALS.train( + ratings, rank = 2, maxIter = 4, numUserBlocks = 3, numItemBlocks = 4) + for ((tpe, factors) <- Seq(("User", userFactors), ("Item", itemFactors))) { + assert(userFactors.partitioner.isDefined, s"$tpe factors should have partitioner.") + val part = userFactors.partitioner.get + userFactors.mapPartitionsWithIndex { (idx, items) => + items.foreach { case (id, _) => + if (part.getPartition(id) != idx) { + throw new SparkException(s"$tpe with ID $id should not be in partition $idx.") + } + } + Iterator.empty + }.count() + } + } } -- GitLab