From fc47bb6967e0df40870413e09d37aa9b90248f43 Mon Sep 17 00:00:00 2001
From: GuoQiang Li <witgo@qq.com>
Date: Wed, 30 Jul 2014 11:00:11 -0700
Subject: [PATCH] [SPARK-2544][MLLIB] Improve ALS algorithm resource usage

Author: GuoQiang Li <witgo@qq.com>
Author: witgo <witgo@qq.com>

Closes #929 from witgo/improve_als and squashes the following commits:

ea25033 [GuoQiang Li] checkpoint products 3,6,9 ...
154dccf [GuoQiang Li] checkpoint products only
c5779ff [witgo] Improve ALS algorithm resource usage
---
 .../scala/org/apache/spark/mllib/recommendation/ALS.scala   | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 5356790cb5..d208cfb917 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -255,6 +255,9 @@ class ALS private (
           rank, lambda, alpha, YtY)
         previousProducts.unpersist()
         logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
+        if (sc.checkpointDir.isDefined && (iter % 3 == 0)) {
+          products.checkpoint()
+        }
         products.setName(s"products-$iter").persist()
         val XtX = Some(sc.broadcast(computeYtY(products)))
         val previousUsers = users
@@ -268,6 +271,9 @@ class ALS private (
         logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
         products = updateFeatures(numProductBlocks, users, userOutLinks, productInLinks,
           rank, lambda, alpha, YtY = None)
+        if (sc.checkpointDir.isDefined && (iter % 3 == 0)) {
+          products.checkpoint()
+        }
         products.setName(s"products-$iter")
         logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
         users = updateFeatures(numUserBlocks, products, productOutLinks, userInLinks,
-- 
GitLab