From 72df5a301e706d9384f3a1c17b2c58b017632b1f Mon Sep 17 00:00:00 2001
From: "Fernando Otero (ZeoS)" <fotero@gmail.com>
Date: Thu, 8 Jan 2015 12:42:54 -0800
Subject: [PATCH] SPARK-5148 [MLlib] Make usersOut/productsOut storagelevel in
 ALS configurable

Author: Fernando Otero (ZeoS) <fotero@gmail.com>

Closes #3953 from zeitos/storageLevel and squashes the following commits:

0f070b9 [Fernando Otero (ZeoS)] fix imports
6869e80 [Fernando Otero (ZeoS)] fix comment length
90c9f7e [Fernando Otero (ZeoS)] fix comment length
18a992e [Fernando Otero (ZeoS)] changing storage level
---
 .../spark/mllib/recommendation/ALS.scala      | 18 +++++++++++--
 .../spark/mllib/recommendation/ALSSuite.scala | 27 +++++++++++++++++++
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 90ac252226..bee951a2e5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -116,6 +116,7 @@ class ALS private (
 
   /** storage level for user/product in/out links */
   private var intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK
+  private var finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK
 
   /**
    * Set the number of blocks for both user blocks and product blocks to parallelize the computation
@@ -204,6 +205,19 @@ class ALS private (
     this
   }
 
+  /**
+   * :: DeveloperApi ::
+   * Sets storage level for final RDDs (user/product used in MatrixFactorizationModel). The default
+   * value is `MEMORY_AND_DISK`. Users can change it to a serialized storage, e.g. 
+   * `MEMORY_AND_DISK_SER` and set `spark.rdd.compress` to `true` to reduce the space requirement,
+   * at the cost of speed.
+   */
+  @DeveloperApi
+  def setFinalRDDStorageLevel(storageLevel: StorageLevel): this.type = {
+    this.finalRDDStorageLevel = storageLevel
+    this
+  }
+
   /**
    * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
    * Returns a MatrixFactorizationModel with feature vectors for each user and product.
@@ -307,8 +321,8 @@ class ALS private (
     val usersOut = unblockFactors(users, userOutLinks)
     val productsOut = unblockFactors(products, productOutLinks)
 
-    usersOut.setName("usersOut").persist(StorageLevel.MEMORY_AND_DISK)
-    productsOut.setName("productsOut").persist(StorageLevel.MEMORY_AND_DISK)
+    usersOut.setName("usersOut").persist(finalRDDStorageLevel)
+    productsOut.setName("productsOut").persist(finalRDDStorageLevel)
 
     // Materialize usersOut and productsOut.
     usersOut.count()
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index 603d0ad127..f3b7bfda78 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -27,6 +27,7 @@ import org.jblas.DoubleMatrix
 import org.apache.spark.SparkContext._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.recommendation.ALS.BlockStats
+import org.apache.spark.storage.StorageLevel
 
 object ALSSuite {
 
@@ -139,6 +140,32 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext {
     assert(u11 != u2)
   }
 
+  test("Storage Level for RDDs in model") {
+    val ratings = sc.parallelize(ALSSuite.generateRatings(10, 20, 5, 0.5, false, false)._1, 2)
+    var storageLevel = StorageLevel.MEMORY_ONLY
+    var model = new ALS()
+      .setRank(5)
+      .setIterations(1)
+      .setLambda(1.0)
+      .setBlocks(2)
+      .setSeed(1)
+      .setFinalRDDStorageLevel(storageLevel)
+      .run(ratings)
+    assert(model.productFeatures.getStorageLevel == storageLevel);
+    assert(model.userFeatures.getStorageLevel == storageLevel);
+    storageLevel = StorageLevel.DISK_ONLY
+    model = new ALS()
+      .setRank(5)
+      .setIterations(1)
+      .setLambda(1.0)
+      .setBlocks(2)
+      .setSeed(1)
+      .setFinalRDDStorageLevel(storageLevel)
+      .run(ratings)
+    assert(model.productFeatures.getStorageLevel == storageLevel);
+    assert(model.userFeatures.getStorageLevel == storageLevel);
+  }
+
   test("negative ids") {
     val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false)
     val ratings = sc.parallelize(data._1.map { case Rating(u, p, r) =>
-- 
GitLab