diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 23045fa2b6863e00e348082e0e1d3f9b84bc0e55..d45866c016d91ff152d7a75f662f27c1f8197631 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -39,6 +39,7 @@ import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.BoundedPriorityQueue /** * Model representing the result of matrix factorization. @@ -274,46 +275,64 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { - val srcBlocks = blockify(rank, srcFeatures) - val dstBlocks = blockify(rank, dstFeatures) - val ratings = srcBlocks.cartesian(dstBlocks).flatMap { - case ((srcIds, srcFactors), (dstIds, dstFactors)) => - val m = srcIds.length - val n = dstIds.length - val ratings = srcFactors.transpose.multiply(dstFactors) - val output = new Array[(Int, (Int, Double))](m * n) - var k = 0 - ratings.foreachActive { (i, j, r) => - output(k) = (srcIds(i), (dstIds(j), r)) - k += 1 + val srcBlocks = blockify(srcFeatures) + val dstBlocks = blockify(dstFeatures) + /** + * The previous approach used for computing top-k recommendations aimed to group + * individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could + * be used for efficiency. However, this causes excessive GC pressure due to the large + * arrays required for intermediate result storage, as well as a high sensitivity to the + * block size used. + * The following approach still groups factors into blocks, but instead computes the + * top-k elements per block, using a simple dot product (instead of gemm) and an efficient + * [[BoundedPriorityQueue]]. This avoids any large intermediate data structures and results + * in significantly reduced GC pressure as well as shuffle data, which far outweighs + * any cost incurred from not using Level 3 BLAS operations. + */ + val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => + val m = srcIter.size + val n = math.min(dstIter.size, num) + val output = new Array[(Int, (Int, Double))](m * n) + var j = 0 + val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) + srcIter.foreach { case (srcId, srcFactor) => + dstIter.foreach { case (dstId, dstFactor) => + /* + * The below code is equivalent to + * `val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1)` + * This handwritten version is as or more efficient as BLAS calls in this case. + */ + var score: Double = 0 + var k = 0 + while (k < rank) { + score += srcFactor(k) * dstFactor(k) + k += 1 + } + pq += dstId -> score + } + val pqIter = pq.iterator + var i = 0 + while (i < n) { + output(j + i) = (srcId, pqIter.next()) + i += 1 } - output.toSeq + j += n + pq.clear() + } + output.toSeq } ratings.topByKey(num)(Ordering.by(_._2)) } /** - * Blockifies features to use Level-3 BLAS. + * Blockifies features to improve the efficiency of cartesian product + * TODO: SPARK-20443 - expose blockSize as a param? */ private def blockify( - rank: Int, - features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = { - val blockSize = 4096 // TODO: tune the block size - val blockStorage = rank * blockSize + features: RDD[(Int, Array[Double])], + blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = { features.mapPartitions { iter => - iter.grouped(blockSize).map { grouped => - val ids = mutable.ArrayBuilder.make[Int] - ids.sizeHint(blockSize) - val factors = mutable.ArrayBuilder.make[Double] - factors.sizeHint(blockStorage) - var i = 0 - grouped.foreach { case (id, factor) => - ids += id - factors ++= factor - i += 1 - } - (ids.result(), new DenseMatrix(rank, i, factors.result())) - } + iter.grouped(blockSize) } }