Skip to content
Snippets Groups Projects
Commit 72df5a30 authored by Fernando Otero (ZeoS)'s avatar Fernando Otero (ZeoS) Committed by Xiangrui Meng
Browse files

SPARK-5148 [MLlib] Make usersOut/productsOut storagelevel in ALS configurable

Author: Fernando Otero (ZeoS) <fotero@gmail.com>

Closes #3953 from zeitos/storageLevel and squashes the following commits:

0f070b9 [Fernando Otero (ZeoS)] fix imports
6869e80 [Fernando Otero (ZeoS)] fix comment length
90c9f7e [Fernando Otero (ZeoS)] fix comment length
18a992e [Fernando Otero (ZeoS)] changing storage level
parent 538f2216
No related branches found
No related tags found
No related merge requests found
...@@ -116,6 +116,7 @@ class ALS private ( ...@@ -116,6 +116,7 @@ class ALS private (
/** storage level for user/product in/out links */ /** storage level for user/product in/out links */
private var intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK private var intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK
private var finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK
/** /**
* Set the number of blocks for both user blocks and product blocks to parallelize the computation * Set the number of blocks for both user blocks and product blocks to parallelize the computation
...@@ -204,6 +205,19 @@ class ALS private ( ...@@ -204,6 +205,19 @@ class ALS private (
this this
} }
/**
* :: DeveloperApi ::
* Sets storage level for final RDDs (user/product used in MatrixFactorizationModel). The default
* value is `MEMORY_AND_DISK`. Users can change it to a serialized storage, e.g.
* `MEMORY_AND_DISK_SER` and set `spark.rdd.compress` to `true` to reduce the space requirement,
* at the cost of speed.
*/
@DeveloperApi
def setFinalRDDStorageLevel(storageLevel: StorageLevel): this.type = {
this.finalRDDStorageLevel = storageLevel
this
}
/** /**
* Run ALS with the configured parameters on an input RDD of (user, product, rating) triples. * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
* Returns a MatrixFactorizationModel with feature vectors for each user and product. * Returns a MatrixFactorizationModel with feature vectors for each user and product.
...@@ -307,8 +321,8 @@ class ALS private ( ...@@ -307,8 +321,8 @@ class ALS private (
val usersOut = unblockFactors(users, userOutLinks) val usersOut = unblockFactors(users, userOutLinks)
val productsOut = unblockFactors(products, productOutLinks) val productsOut = unblockFactors(products, productOutLinks)
usersOut.setName("usersOut").persist(StorageLevel.MEMORY_AND_DISK) usersOut.setName("usersOut").persist(finalRDDStorageLevel)
productsOut.setName("productsOut").persist(StorageLevel.MEMORY_AND_DISK) productsOut.setName("productsOut").persist(finalRDDStorageLevel)
// Materialize usersOut and productsOut. // Materialize usersOut and productsOut.
usersOut.count() usersOut.count()
......
...@@ -27,6 +27,7 @@ import org.jblas.DoubleMatrix ...@@ -27,6 +27,7 @@ import org.jblas.DoubleMatrix
import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._
import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.recommendation.ALS.BlockStats import org.apache.spark.mllib.recommendation.ALS.BlockStats
import org.apache.spark.storage.StorageLevel
object ALSSuite { object ALSSuite {
...@@ -139,6 +140,32 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext { ...@@ -139,6 +140,32 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext {
assert(u11 != u2) assert(u11 != u2)
} }
test("Storage Level for RDDs in model") {
val ratings = sc.parallelize(ALSSuite.generateRatings(10, 20, 5, 0.5, false, false)._1, 2)
var storageLevel = StorageLevel.MEMORY_ONLY
var model = new ALS()
.setRank(5)
.setIterations(1)
.setLambda(1.0)
.setBlocks(2)
.setSeed(1)
.setFinalRDDStorageLevel(storageLevel)
.run(ratings)
assert(model.productFeatures.getStorageLevel == storageLevel);
assert(model.userFeatures.getStorageLevel == storageLevel);
storageLevel = StorageLevel.DISK_ONLY
model = new ALS()
.setRank(5)
.setIterations(1)
.setLambda(1.0)
.setBlocks(2)
.setSeed(1)
.setFinalRDDStorageLevel(storageLevel)
.run(ratings)
assert(model.productFeatures.getStorageLevel == storageLevel);
assert(model.userFeatures.getStorageLevel == storageLevel);
}
test("negative ids") { test("negative ids") {
val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false) val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false)
val ratings = sc.parallelize(data._1.map { case Rating(u, p, r) => val ratings = sc.parallelize(data._1.map { case Rating(u, p, r) =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment