Skip to content
Snippets Groups Projects
Commit 215c13dd authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Fix code style and a nondeterministic RDD issue in ALS

parent 46ea0c1b
No related branches found
No related tags found
No related merge requests found
......@@ -123,18 +123,27 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock)
// Initialize user and product factors randomly
var users = userOutLinks.mapPartitions {itr =>
val rand = new Random()
itr.map({case (x, y) =>
(x, y.elementIds.map(u => randomFactor(rank, rand)))
})
// Initialize user and product factors randomly, but use a deterministic seed for each partition
// so that fault recovery works
val seedGen = new Random()
val seed1 = seedGen.nextInt()
val seed2 = seedGen.nextInt()
// Hash an integer to propagate random bits at all positions, similar to java.util.HashTable
def hash(x: Int): Int = {
val r = x ^ (x >>> 20) ^ (x >>> 12)
r ^ (r >>> 7) ^ (r >>> 4)
}
var products = productOutLinks.mapPartitions {itr =>
val rand = new Random()
itr.map({case (x, y) =>
(x, y.elementIds.map(u => randomFactor(rank, rand)))
})
var users = userOutLinks.mapPartitionsWithIndex { (index, itr) =>
val rand = new Random(hash(seed1 ^ index))
itr.map { case (x, y) =>
(x, y.elementIds.map(_ => randomFactor(rank, rand)))
}
}
var products = productOutLinks.mapPartitionsWithIndex { (index, itr) =>
val rand = new Random(hash(seed2 ^ index))
itr.map { case (x, y) =>
(x, y.elementIds.map(_ => randomFactor(rank, rand)))
}
}
for (iter <- 0 until iterations) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment