Skip to content
Snippets Groups Projects
Commit 88e6d750 authored by Daniel Li's avatar Daniel Li Committed by Sean Owen
Browse files

[SPARK-20484][MLLIB] Add documentation to ALS code

## What changes were proposed in this pull request?

This PR adds documentation to the ALS code.

## How was this patch tested?

Existing tests were used.

mengxr srowen

This contribution is my original work.  I have the license to work on this project under the Spark project’s open source license.

Author: Daniel Li <dan@danielyli.com>

Closes #17793 from danielyli/spark-20484.
parent 37f963ac
No related branches found
No related tags found
No related merge requests found
......@@ -774,6 +774,28 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
/**
* :: DeveloperApi ::
* Implementation of the ALS algorithm.
*
* This implementation of the ALS factorization algorithm partitions the two sets of factors among
* Spark workers so as to reduce network communication by only sending one copy of each factor
* vector to each Spark worker on each iteration, and only if needed. This is achieved by
* precomputing some information about the ratings matrix to determine which users require which
* item factors and vice versa. See the Scaladoc for `InBlock` for a detailed explanation of how
* the precomputation is done.
*
* In addition, since each iteration of calculating the factor matrices depends on the known
* ratings, which are spread across Spark partitions, a naive implementation would incur
* significant network communication overhead between Spark workers, as the ratings RDD would be
* repeatedly shuffled during each iteration. This implementation reduces that overhead by
* performing the shuffling operation up front, precomputing each partition's ratings dependencies
* and duplicating those values to the appropriate workers before starting iterations to solve for
* the factor matrices. See the Scaladoc for `OutBlock` for a detailed explanation of how the
* precomputation is done.
*
* Note that the term "rating block" is a bit of a misnomer, as the ratings are not partitioned by
* contiguous blocks from the ratings matrix but by a hash function on the rating's location in
* the matrix. If it helps you to visualize the partitions, it is easier to think of the term
* "block" as referring to a subset of an RDD containing the ratings rather than a contiguous
* submatrix of the ratings matrix.
*/
@DeveloperApi
def train[ID: ClassTag]( // scalastyle:ignore
......@@ -791,32 +813,43 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
checkpointInterval: Int = 10,
seed: Long = 0L)(
implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = {
require(!ratings.isEmpty(), s"No ratings available from $ratings")
require(intermediateRDDStorageLevel != StorageLevel.NONE,
"ALS is not designed to run without persisting intermediate RDDs.")
val sc = ratings.sparkContext
// Precompute the rating dependencies of each partition
val userPart = new ALSPartitioner(numUserBlocks)
val itemPart = new ALSPartitioner(numItemBlocks)
val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)
val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
val blockRatings = partitionRatings(ratings, userPart, itemPart)
.persist(intermediateRDDStorageLevel)
val (userInBlocks, userOutBlocks) =
makeBlocks("user", blockRatings, userPart, itemPart, intermediateRDDStorageLevel)
// materialize blockRatings and user blocks
userOutBlocks.count()
userOutBlocks.count() // materialize blockRatings and user blocks
val swappedBlockRatings = blockRatings.map {
case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) =>
((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings))
}
val (itemInBlocks, itemOutBlocks) =
makeBlocks("item", swappedBlockRatings, itemPart, userPart, intermediateRDDStorageLevel)
// materialize item blocks
itemOutBlocks.count()
itemOutBlocks.count() // materialize item blocks
// Encoders for storing each user/item's partition ID and index within its partition using a
// single integer; used as an optimization
val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)
// These are the user and item factor matrices that, once trained, are multiplied together to
// estimate the rating matrix. The two matrices are stored in RDDs, partitioned by column such
// that each factor column resides on the same Spark worker as its corresponding user or item.
val seedGen = new XORShiftRandom(seed)
var userFactors = initialize(userInBlocks, rank, seedGen.nextLong())
var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong())
val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
var previousCheckpointFile: Option[String] = None
val shouldCheckpoint: Int => Boolean = (iter) =>
sc.checkpointDir.isDefined && checkpointInterval != -1 && (iter % checkpointInterval == 0)
......@@ -830,6 +863,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
logWarning(s"Cannot delete checkpoint file $file:", e)
}
}
if (implicitPrefs) {
for (iter <- 1 to maxIter) {
userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel)
......@@ -910,26 +944,154 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
private type FactorBlock = Array[Array[Float]]
/**
* Out-link block that stores, for each dst (item/user) block, which src (user/item) factors to
* send. For example, outLinkBlock(0) contains the local indices (not the original src IDs) of the
* src factors in this block to send to dst block 0.
* A mapping of the columns of the items factor matrix that are needed when calculating each row
* of the users factor matrix, and vice versa.
*
* Specifically, when calculating a user factor vector, since only those columns of the items
* factor matrix that correspond to the items that that user has rated are needed, we can avoid
* having to repeatedly copy the entire items factor matrix to each worker later in the algorithm
* by precomputing these dependencies for all users, storing them in an RDD of `OutBlock`s. The
* items' dependencies on the columns of the users factor matrix is computed similarly.
*
* =Example=
*
* Using the example provided in the `InBlock` Scaladoc, `userOutBlocks` would look like the
* following:
*
* {{{
* userOutBlocks.collect() == Seq(
* 0 -> Array(Array(0, 1), Array(0, 1)),
* 1 -> Array(Array(0), Array(0))
* )
* }}}
*
* Each value in this map-like sequence is of type `Array[Array[Int]]`. The values in the
* inner array are the ranks of the sorted user IDs in that partition; so in the example above,
* `Array(0, 1)` in partition 0 refers to user IDs 0 and 6, since when all unique user IDs in
* partition 0 are sorted, 0 is the first ID and 6 is the second. The position of each inner
* array in its enclosing outer array denotes the partition number to which item IDs map; in the
* example, the first `Array(0, 1)` is in position 0 of its outer array, denoting item IDs that
* map to partition 0.
*
* In summary, the data structure encodes the following information:
*
* * There are ratings with user IDs 0 and 6 (encoded in `Array(0, 1)`, where 0 and 1 are the
* indices of the user IDs 0 and 6 on partition 0) whose item IDs map to partitions 0 and 1
* (represented by the fact that `Array(0, 1)` appears in both the 0th and 1st positions).
*
* * There are ratings with user ID 3 (encoded in `Array(0)`, where 0 is the index of the user
* ID 3 on partition 1) whose item IDs map to partitions 0 and 1 (represented by the fact that
* `Array(0)` appears in both the 0th and 1st positions).
*/
private type OutBlock = Array[Array[Int]]
/**
* In-link block for computing src (user/item) factors. This includes the original src IDs
* of the elements within this block as well as encoded dst (item/user) indices and corresponding
* ratings. The dst indices are in the form of (blockId, localIndex), which are not the original
* dst IDs. To compute src factors, we expect receiving dst factors that match the dst indices.
* For example, if we have an in-link record
* In-link block for computing user and item factor matrices.
*
* The ALS algorithm partitions the columns of the users factor matrix evenly among Spark workers.
* Since each column of the factor matrix is calculated using the known ratings of the correspond-
* ing user, and since the ratings don't change across iterations, the ALS algorithm preshuffles
* the ratings to the appropriate partitions, storing them in `InBlock` objects.
*
* The ratings shuffled by item ID are computed similarly and also stored in `InBlock` objects.
* Note that this means every rating is stored twice, once as shuffled by user ID and once by item
* ID. This is a necessary tradeoff, since in general a rating will not be on the same worker
* when partitioned by user as by item.
*
* =Example=
*
* Say we have a small collection of eight items to offer the seven users in our application. We
* have some known ratings given by the users, as seen in the matrix below:
*
* {{{
* Items
* 0 1 2 3 4 5 6 7
* +---+---+---+---+---+---+---+---+
* 0 | |0.1| | |0.4| | |0.7|
* +---+---+---+---+---+---+---+---+
* 1 | | | | | | | | |
* +---+---+---+---+---+---+---+---+
* U 2 | | | | | | | | |
* s +---+---+---+---+---+---+---+---+
* e 3 | |3.1| | |3.4| | |3.7|
* r +---+---+---+---+---+---+---+---+
* s 4 | | | | | | | | |
* +---+---+---+---+---+---+---+---+
* 5 | | | | | | | | |
* +---+---+---+---+---+---+---+---+
* 6 | |6.1| | |6.4| | |6.7|
* +---+---+---+---+---+---+---+---+
* }}}
*
* The ratings are represented as an RDD, passed to the `partitionRatings` method as the `ratings`
* parameter:
*
* {{{
* ratings.collect() == Seq(
* Rating(0, 1, 0.1f),
* Rating(0, 4, 0.4f),
* Rating(0, 7, 0.7f),
* Rating(3, 1, 3.1f),
* Rating(3, 4, 3.4f),
* Rating(3, 7, 3.7f),
* Rating(6, 1, 6.1f),
* Rating(6, 4, 6.4f),
* Rating(6, 7, 6.7f)
* )
* }}}
*
* {srcId: 0, dstBlockId: 2, dstLocalIndex: 3, rating: 5.0},
* Say that we are using two partitions to calculate each factor matrix:
*
* and assume that the dst factors are stored as dstFactors: Map[Int, Array[Array[Float]]], which
* is a blockId to dst factors map, the corresponding dst factor of the record is dstFactor(2)(3).
* {{{
* val userPart = new ALSPartitioner(2)
* val itemPart = new ALSPartitioner(2)
* val blockRatings = partitionRatings(ratings, userPart, itemPart)
* }}}
*
* We use a CSC-like (compressed sparse column) format to store the in-link information. So we can
* compute src factors one after another using only one normal equation instance.
* Ratings are mapped to partitions using the user/item IDs modulo the number of partitions. With
* two partitions, ratings with even-valued user IDs are shuffled to partition 0 while those with
* odd-valued user IDs are shuffled to partition 1:
*
* {{{
* userInBlocks.collect() == Seq(
* 0 -> Seq(
* // Internally, the class stores the ratings in a more optimized format than
* // a sequence of `Rating`s, but for clarity we show it as such here.
* Rating(0, 1, 0.1f),
* Rating(0, 4, 0.4f),
* Rating(0, 7, 0.7f),
* Rating(6, 1, 6.1f),
* Rating(6, 4, 6.4f),
* Rating(6, 7, 6.7f)
* ),
* 1 -> Seq(
* Rating(3, 1, 3.1f),
* Rating(3, 4, 3.4f),
* Rating(3, 7, 3.7f)
* )
* )
* }}}
*
* Similarly, ratings with even-valued item IDs are shuffled to partition 0 while those with
* odd-valued item IDs are shuffled to partition 1:
*
* {{{
* itemInBlocks.collect() == Seq(
* 0 -> Seq(
* Rating(0, 4, 0.4f),
* Rating(3, 4, 3.4f),
* Rating(6, 4, 6.4f)
* ),
* 1 -> Seq(
* Rating(0, 1, 0.1f),
* Rating(0, 7, 0.7f),
* Rating(3, 1, 3.1f),
* Rating(3, 7, 3.7f),
* Rating(6, 1, 6.1f),
* Rating(6, 7, 6.7f)
* )
* )
* }}}
*
* @param srcIds src ids (ordered)
* @param dstPtrs dst pointers. Elements in range [dstPtrs(i), dstPtrs(i+1)) of dst indices and
......@@ -1026,7 +1188,24 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
}
/**
* Partitions raw ratings into blocks.
* Groups an RDD of [[Rating]]s by the user partition and item partition to which each `Rating`
* maps according to the given partitioners. The returned pair RDD holds the ratings, encoded in
* a memory-efficient format but otherwise unchanged, keyed by the (user partition ID, item
* partition ID) pair.
*
* Performance note: This is an expensive operation that performs an RDD shuffle.
*
* Implementation note: This implementation produces the same result as the following but
* generates fewer intermediate objects:
*
* {{{
* ratings.map { r =>
* ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r)
* }.aggregateByKey(new RatingBlockBuilder)(
* seqOp = (b, r) => b.add(r),
* combOp = (b0, b1) => b0.merge(b1.build()))
* .mapValues(_.build())
* }}}
*
* @param ratings raw ratings
* @param srcPart partitioner for src IDs
......@@ -1037,17 +1216,6 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
ratings: RDD[Rating[ID]],
srcPart: Partitioner,
dstPart: Partitioner): RDD[((Int, Int), RatingBlock[ID])] = {
/* The implementation produces the same result as the following but generates less objects.
ratings.map { r =>
((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r)
}.aggregateByKey(new RatingBlockBuilder)(
seqOp = (b, r) => b.add(r),
combOp = (b0, b1) => b0.merge(b1.build()))
.mapValues(_.build())
*/
val numPartitions = srcPart.numPartitions * dstPart.numPartitions
ratings.mapPartitions { iter =>
val builders = Array.fill(numPartitions)(new RatingBlockBuilder[ID])
......@@ -1135,8 +1303,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
def length: Int = srcIds.length
/**
* Compresses the block into an [[InBlock]]. The algorithm is the same as converting a
* sparse matrix from coordinate list (COO) format into compressed sparse column (CSC) format.
* Compresses the block into an `InBlock`. The algorithm is the same as converting a sparse
* matrix from coordinate list (COO) format into compressed sparse column (CSC) format.
* Sorting is done using Spark's built-in Timsort to avoid generating too many objects.
*/
def compress(): InBlock[ID] = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment