From 88e6d75072c23fa99d4df00d087d03d8c38e8c69 Mon Sep 17 00:00:00 2001
From: Daniel Li <dan@danielyli.com>
Date: Sun, 7 May 2017 10:09:58 +0100
Subject: [PATCH] [SPARK-20484][MLLIB] Add documentation to ALS code
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

## What changes were proposed in this pull request?

This PR adds documentation to the ALS code.

## How was this patch tested?

Existing tests were used.

mengxr srowen

This contribution is my original work.  I have the license to work on this project under the Spark project’s open source license.

Author: Daniel Li <dan@danielyli.com>

Closes #17793 from danielyli/spark-20484.
---
 .../apache/spark/ml/recommendation/ALS.scala  | 236 +++++++++++++++---
 1 file changed, 202 insertions(+), 34 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index a20ef72446..1562bf1beb 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -774,6 +774,28 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
   /**
    * :: DeveloperApi ::
    * Implementation of the ALS algorithm.
+   *
+   * This implementation of the ALS factorization algorithm partitions the two sets of factors among
+   * Spark workers so as to reduce network communication by only sending one copy of each factor
+   * vector to each Spark worker on each iteration, and only if needed.  This is achieved by
+   * precomputing some information about the ratings matrix to determine which users require which
+   * item factors and vice versa.  See the Scaladoc for `InBlock` for a detailed explanation of how
+   * the precomputation is done.
+   *
+   * In addition, since each iteration of calculating the factor matrices depends on the known
+   * ratings, which are spread across Spark partitions, a naive implementation would incur
+   * significant network communication overhead between Spark workers, as the ratings RDD would be
+   * repeatedly shuffled during each iteration.  This implementation reduces that overhead by
+   * performing the shuffling operation up front, precomputing each partition's ratings dependencies
+   * and duplicating those values to the appropriate workers before starting iterations to solve for
+   * the factor matrices.  See the Scaladoc for `OutBlock` for a detailed explanation of how the
+   * precomputation is done.
+   *
+   * Note that the term "rating block" is a bit of a misnomer, as the ratings are not partitioned by
+   * contiguous blocks from the ratings matrix but by a hash function on the rating's location in
+   * the matrix.  If it helps you to visualize the partitions, it is easier to think of the term
+   * "block" as referring to a subset of an RDD containing the ratings rather than a contiguous
+   * submatrix of the ratings matrix.
    */
   @DeveloperApi
   def train[ID: ClassTag]( // scalastyle:ignore
@@ -791,32 +813,43 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
       checkpointInterval: Int = 10,
       seed: Long = 0L)(
       implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = {
+
     require(!ratings.isEmpty(), s"No ratings available from $ratings")
     require(intermediateRDDStorageLevel != StorageLevel.NONE,
       "ALS is not designed to run without persisting intermediate RDDs.")
+
     val sc = ratings.sparkContext
+
+    // Precompute the rating dependencies of each partition
     val userPart = new ALSPartitioner(numUserBlocks)
     val itemPart = new ALSPartitioner(numItemBlocks)
-    val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
-    val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)
-    val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
     val blockRatings = partitionRatings(ratings, userPart, itemPart)
       .persist(intermediateRDDStorageLevel)
     val (userInBlocks, userOutBlocks) =
       makeBlocks("user", blockRatings, userPart, itemPart, intermediateRDDStorageLevel)
-    // materialize blockRatings and user blocks
-    userOutBlocks.count()
+    userOutBlocks.count()    // materialize blockRatings and user blocks
     val swappedBlockRatings = blockRatings.map {
       case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) =>
         ((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings))
     }
     val (itemInBlocks, itemOutBlocks) =
       makeBlocks("item", swappedBlockRatings, itemPart, userPart, intermediateRDDStorageLevel)
-    // materialize item blocks
-    itemOutBlocks.count()
+    itemOutBlocks.count()    // materialize item blocks
+
+    // Encoders for storing each user/item's partition ID and index within its partition using a
+    // single integer; used as an optimization
+    val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
+    val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)
+
+    // These are the user and item factor matrices that, once trained, are multiplied together to
+    // estimate the rating matrix.  The two matrices are stored in RDDs, partitioned by column such
+    // that each factor column resides on the same Spark worker as its corresponding user or item.
     val seedGen = new XORShiftRandom(seed)
     var userFactors = initialize(userInBlocks, rank, seedGen.nextLong())
     var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong())
+
+    val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
+
     var previousCheckpointFile: Option[String] = None
     val shouldCheckpoint: Int => Boolean = (iter) =>
       sc.checkpointDir.isDefined && checkpointInterval != -1 && (iter % checkpointInterval == 0)
@@ -830,6 +863,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
             logWarning(s"Cannot delete checkpoint file $file:", e)
         }
       }
+
     if (implicitPrefs) {
       for (iter <- 1 to maxIter) {
         userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel)
@@ -910,26 +944,154 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
   private type FactorBlock = Array[Array[Float]]
 
   /**
-   * Out-link block that stores, for each dst (item/user) block, which src (user/item) factors to
-   * send. For example, outLinkBlock(0) contains the local indices (not the original src IDs) of the
-   * src factors in this block to send to dst block 0.
+   * A mapping of the columns of the items factor matrix that are needed when calculating each row
+   * of the users factor matrix, and vice versa.
+   *
+   * Specifically, when calculating a user factor vector, since only those columns of the items
+   * factor matrix that correspond to the items that that user has rated are needed, we can avoid
+   * having to repeatedly copy the entire items factor matrix to each worker later in the algorithm
+   * by precomputing these dependencies for all users, storing them in an RDD of `OutBlock`s.  The
+   * items' dependencies on the columns of the users factor matrix is computed similarly.
+   *
+   * =Example=
+   *
+   * Using the example provided in the `InBlock` Scaladoc, `userOutBlocks` would look like the
+   * following:
+   *
+   * {{{
+   *     userOutBlocks.collect() == Seq(
+   *       0 -> Array(Array(0, 1), Array(0, 1)),
+   *       1 -> Array(Array(0), Array(0))
+   *     )
+   * }}}
+   *
+   * Each value in this map-like sequence is of type `Array[Array[Int]]`.  The values in the
+   * inner array are the ranks of the sorted user IDs in that partition; so in the example above,
+   * `Array(0, 1)` in partition 0 refers to user IDs 0 and 6, since when all unique user IDs in
+   * partition 0 are sorted, 0 is the first ID and 6 is the second.  The position of each inner
+   * array in its enclosing outer array denotes the partition number to which item IDs map; in the
+   * example, the first `Array(0, 1)` is in position 0 of its outer array, denoting item IDs that
+   * map to partition 0.
+   *
+   * In summary, the data structure encodes the following information:
+   *
+   *   *  There are ratings with user IDs 0 and 6 (encoded in `Array(0, 1)`, where 0 and 1 are the
+   *   indices of the user IDs 0 and 6 on partition 0) whose item IDs map to partitions 0 and 1
+   *   (represented by the fact that `Array(0, 1)` appears in both the 0th and 1st positions).
+   *
+   *   *  There are ratings with user ID 3 (encoded in `Array(0)`, where 0 is the index of the user
+   *   ID 3 on partition 1) whose item IDs map to partitions 0 and 1 (represented by the fact that
+   *   `Array(0)` appears in both the 0th and 1st positions).
    */
   private type OutBlock = Array[Array[Int]]
 
   /**
-   * In-link block for computing src (user/item) factors. This includes the original src IDs
-   * of the elements within this block as well as encoded dst (item/user) indices and corresponding
-   * ratings. The dst indices are in the form of (blockId, localIndex), which are not the original
-   * dst IDs. To compute src factors, we expect receiving dst factors that match the dst indices.
-   * For example, if we have an in-link record
+   * In-link block for computing user and item factor matrices.
+   *
+   * The ALS algorithm partitions the columns of the users factor matrix evenly among Spark workers.
+   * Since each column of the factor matrix is calculated using the known ratings of the correspond-
+   * ing user, and since the ratings don't change across iterations, the ALS algorithm preshuffles
+   * the ratings to the appropriate partitions, storing them in `InBlock` objects.
+   *
+   * The ratings shuffled by item ID are computed similarly and also stored in `InBlock` objects.
+   * Note that this means every rating is stored twice, once as shuffled by user ID and once by item
+   * ID.  This is a necessary tradeoff, since in general a rating will not be on the same worker
+   * when partitioned by user as by item.
+   *
+   * =Example=
+   *
+   * Say we have a small collection of eight items to offer the seven users in our application.  We
+   * have some known ratings given by the users, as seen in the matrix below:
+   *
+   * {{{
+   *                       Items
+   *            0   1   2   3   4   5   6   7
+   *          +---+---+---+---+---+---+---+---+
+   *        0 |   |0.1|   |   |0.4|   |   |0.7|
+   *          +---+---+---+---+---+---+---+---+
+   *        1 |   |   |   |   |   |   |   |   |
+   *          +---+---+---+---+---+---+---+---+
+   *     U  2 |   |   |   |   |   |   |   |   |
+   *     s    +---+---+---+---+---+---+---+---+
+   *     e  3 |   |3.1|   |   |3.4|   |   |3.7|
+   *     r    +---+---+---+---+---+---+---+---+
+   *     s  4 |   |   |   |   |   |   |   |   |
+   *          +---+---+---+---+---+---+---+---+
+   *        5 |   |   |   |   |   |   |   |   |
+   *          +---+---+---+---+---+---+---+---+
+   *        6 |   |6.1|   |   |6.4|   |   |6.7|
+   *          +---+---+---+---+---+---+---+---+
+   * }}}
+   *
+   * The ratings are represented as an RDD, passed to the `partitionRatings` method as the `ratings`
+   * parameter:
+   *
+   * {{{
+   *     ratings.collect() == Seq(
+   *       Rating(0, 1, 0.1f),
+   *       Rating(0, 4, 0.4f),
+   *       Rating(0, 7, 0.7f),
+   *       Rating(3, 1, 3.1f),
+   *       Rating(3, 4, 3.4f),
+   *       Rating(3, 7, 3.7f),
+   *       Rating(6, 1, 6.1f),
+   *       Rating(6, 4, 6.4f),
+   *       Rating(6, 7, 6.7f)
+   *     )
+   * }}}
    *
-   * {srcId: 0, dstBlockId: 2, dstLocalIndex: 3, rating: 5.0},
+   * Say that we are using two partitions to calculate each factor matrix:
    *
-   * and assume that the dst factors are stored as dstFactors: Map[Int, Array[Array[Float]]], which
-   * is a blockId to dst factors map, the corresponding dst factor of the record is dstFactor(2)(3).
+   * {{{
+   *     val userPart = new ALSPartitioner(2)
+   *     val itemPart = new ALSPartitioner(2)
+   *     val blockRatings = partitionRatings(ratings, userPart, itemPart)
+   * }}}
    *
-   * We use a CSC-like (compressed sparse column) format to store the in-link information. So we can
-   * compute src factors one after another using only one normal equation instance.
+   * Ratings are mapped to partitions using the user/item IDs modulo the number of partitions.  With
+   * two partitions, ratings with even-valued user IDs are shuffled to partition 0 while those with
+   * odd-valued user IDs are shuffled to partition 1:
+   *
+   * {{{
+   *     userInBlocks.collect() == Seq(
+   *       0 -> Seq(
+   *              // Internally, the class stores the ratings in a more optimized format than
+   *              // a sequence of `Rating`s, but for clarity we show it as such here.
+   *              Rating(0, 1, 0.1f),
+   *              Rating(0, 4, 0.4f),
+   *              Rating(0, 7, 0.7f),
+   *              Rating(6, 1, 6.1f),
+   *              Rating(6, 4, 6.4f),
+   *              Rating(6, 7, 6.7f)
+   *            ),
+   *       1 -> Seq(
+   *              Rating(3, 1, 3.1f),
+   *              Rating(3, 4, 3.4f),
+   *              Rating(3, 7, 3.7f)
+   *            )
+   *     )
+   * }}}
+   *
+   * Similarly, ratings with even-valued item IDs are shuffled to partition 0 while those with
+   * odd-valued item IDs are shuffled to partition 1:
+   *
+   * {{{
+   *     itemInBlocks.collect() == Seq(
+   *       0 -> Seq(
+   *              Rating(0, 4, 0.4f),
+   *              Rating(3, 4, 3.4f),
+   *              Rating(6, 4, 6.4f)
+   *            ),
+   *       1 -> Seq(
+   *              Rating(0, 1, 0.1f),
+   *              Rating(0, 7, 0.7f),
+   *              Rating(3, 1, 3.1f),
+   *              Rating(3, 7, 3.7f),
+   *              Rating(6, 1, 6.1f),
+   *              Rating(6, 7, 6.7f)
+   *            )
+   *     )
+   * }}}
    *
    * @param srcIds src ids (ordered)
    * @param dstPtrs dst pointers. Elements in range [dstPtrs(i), dstPtrs(i+1)) of dst indices and
@@ -1026,7 +1188,24 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
   }
 
   /**
-   * Partitions raw ratings into blocks.
+   * Groups an RDD of [[Rating]]s by the user partition and item partition to which each `Rating`
+   * maps according to the given partitioners.  The returned pair RDD holds the ratings, encoded in
+   * a memory-efficient format but otherwise unchanged, keyed by the (user partition ID, item
+   * partition ID) pair.
+   *
+   * Performance note: This is an expensive operation that performs an RDD shuffle.
+   *
+   * Implementation note: This implementation produces the same result as the following but
+   * generates fewer intermediate objects:
+   *
+   * {{{
+   *     ratings.map { r =>
+   *       ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r)
+   *     }.aggregateByKey(new RatingBlockBuilder)(
+   *         seqOp = (b, r) => b.add(r),
+   *         combOp = (b0, b1) => b0.merge(b1.build()))
+   *       .mapValues(_.build())
+   * }}}
    *
    * @param ratings raw ratings
    * @param srcPart partitioner for src IDs
@@ -1037,17 +1216,6 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
       ratings: RDD[Rating[ID]],
       srcPart: Partitioner,
       dstPart: Partitioner): RDD[((Int, Int), RatingBlock[ID])] = {
-
-     /* The implementation produces the same result as the following but generates less objects.
-
-     ratings.map { r =>
-       ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r)
-     }.aggregateByKey(new RatingBlockBuilder)(
-         seqOp = (b, r) => b.add(r),
-         combOp = (b0, b1) => b0.merge(b1.build()))
-       .mapValues(_.build())
-     */
-
     val numPartitions = srcPart.numPartitions * dstPart.numPartitions
     ratings.mapPartitions { iter =>
       val builders = Array.fill(numPartitions)(new RatingBlockBuilder[ID])
@@ -1135,8 +1303,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
     def length: Int = srcIds.length
 
     /**
-     * Compresses the block into an [[InBlock]]. The algorithm is the same as converting a
-     * sparse matrix from coordinate list (COO) format into compressed sparse column (CSC) format.
+     * Compresses the block into an `InBlock`. The algorithm is the same as converting a sparse
+     * matrix from coordinate list (COO) format into compressed sparse column (CSC) format.
      * Sorting is done using Spark's built-in Timsort to avoid generating too many objects.
      */
     def compress(): InBlock[ID] = {
-- 
GitLab