Skip to content
Snippets Groups Projects
Commit 040f6f9f authored by tmnd1991's avatar tmnd1991 Committed by Joseph K. Bradley
Browse files

[SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM in maven jenkins builds

## What changes were proposed in this pull request?
"test big model load / save" in Word2VecSuite, lately resulted into OOM.
Therefore we decided to make the partitioning adaptive (not based on spark default "spark.kryoserializer.buffer.max" conf) and then testing it using a small buffer size in order to trigger partitioning without allocating too much memory for the test.

## How was this patch tested?
It was tested running the following unit test:
org.apache.spark.mllib.feature.Word2VecSuite

Author: tmnd1991 <antonio.murgia2@studio.unibo.it>

Closes #13509 from tmnd1991/SPARK-15740.
parent 4f8ceed5
No related branches found
No related tags found
No related merge requests found
......@@ -629,14 +629,16 @@ object Word2VecModel extends Loader[Word2VecModel] {
("vectorSize" -> vectorSize) ~ ("numWords" -> numWords)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
// We want to partition the model in partitions of size 32MB
val partitionSize = (1L << 25)
// We want to partition the model in partitions smaller than
// spark.kryoserializer.buffer.max
val bufferSize = Utils.byteStringAsBytes(
spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
// We calculate the approximate size of the model
// We only calculate the array size, not considering
// the string size, the formula is:
// floatSize * numWords * vectorSize
val approxSize = 4L * numWords * vectorSize
val nPartitions = ((approxSize / partitionSize) + 1).toInt
// We only calculate the array size, considering an
// average string size of 15 bytes, the formula is:
// (floatSize * vectorSize + 15) * numWords
val approxSize = (4L * vectorSize + 15) * numWords
val nPartitions = ((approxSize / bufferSize) + 1).toInt
val dataArray = model.toSeq.map { case (w, v) => Data(w, v) }
spark.createDataFrame(dataArray).repartition(nPartitions).write.parquet(Loader.dataPath(path))
}
......
......@@ -91,11 +91,23 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
}
ignore("big model load / save") {
// create a model bigger than 32MB since 9000 * 1000 * 4 > 2^25
val word2VecMap = Map((0 to 9000).map(i => s"$i" -> Array.fill(1000)(0.1f)): _*)
test("big model load / save") {
// backupping old values
val oldBufferConfValue = spark.conf.get("spark.kryoserializer.buffer.max", "64m")
val oldBufferMaxConfValue = spark.conf.get("spark.kryoserializer.buffer", "64k")
// setting test values to trigger partitioning
spark.conf.set("spark.kryoserializer.buffer", "50b")
spark.conf.set("spark.kryoserializer.buffer.max", "50b")
// create a model bigger than 50 Bytes
val word2VecMap = Map((0 to 10).map(i => s"$i" -> Array.fill(10)(0.1f)): _*)
val model = new Word2VecModel(word2VecMap)
// est. size of this model, given the formula:
// (floatSize * vectorSize + 15) * numWords
// (4 * 10 + 15) * 10 = 550
// therefore it should generate multiple partitions
val tempDir = Utils.createTempDir()
val path = tempDir.toURI.toString
......@@ -103,9 +115,16 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
model.save(sc, path)
val sameModel = Word2VecModel.load(sc, path)
assert(sameModel.getVectors.mapValues(_.toSeq) === model.getVectors.mapValues(_.toSeq))
}
catch {
case t: Throwable => fail("exception thrown persisting a model " +
"that spans over multiple partitions", t)
} finally {
Utils.deleteRecursively(tempDir)
spark.conf.set("spark.kryoserializer.buffer", oldBufferConfValue)
spark.conf.set("spark.kryoserializer.buffer.max", oldBufferMaxConfValue)
}
}
test("test similarity for word vectors with large values is not Infinity or NaN") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment