From 040f6f9f468f153e4c4db78c26ced0299245fb6f Mon Sep 17 00:00:00 2001 From: tmnd1991 <antonio.murgia2@studio.unibo.it> Date: Wed, 6 Jul 2016 12:56:26 -0700 Subject: [PATCH] [SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM in maven jenkins builds ## What changes were proposed in this pull request? "test big model load / save" in Word2VecSuite, lately resulted into OOM. Therefore we decided to make the partitioning adaptive (not based on spark default "spark.kryoserializer.buffer.max" conf) and then testing it using a small buffer size in order to trigger partitioning without allocating too much memory for the test. ## How was this patch tested? It was tested running the following unit test: org.apache.spark.mllib.feature.Word2VecSuite Author: tmnd1991 <antonio.murgia2@studio.unibo.it> Closes #13509 from tmnd1991/SPARK-15740. --- .../apache/spark/mllib/feature/Word2Vec.scala | 16 ++++++------ .../spark/mllib/feature/Word2VecSuite.scala | 25 ++++++++++++++++--- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 2f52825c6c..f2211df3f9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -629,14 +629,16 @@ object Word2VecModel extends Loader[Word2VecModel] { ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) - // We want to partition the model in partitions of size 32MB - val partitionSize = (1L << 25) + // We want to partition the model in partitions smaller than + // spark.kryoserializer.buffer.max + val bufferSize = Utils.byteStringAsBytes( + spark.conf.get("spark.kryoserializer.buffer.max", "64m")) // We calculate the approximate size of the model - // We only calculate the array size, not considering - // the string size, the formula is: - // floatSize * numWords * vectorSize - val approxSize = 4L * numWords * vectorSize - val nPartitions = ((approxSize / partitionSize) + 1).toInt + // We only calculate the array size, considering an + // average string size of 15 bytes, the formula is: + // (floatSize * vectorSize + 15) * numWords + val approxSize = (4L * vectorSize + 15) * numWords + val nPartitions = ((approxSize / bufferSize) + 1).toInt val dataArray = model.toSeq.map { case (w, v) => Data(w, v) } spark.createDataFrame(dataArray).repartition(nPartitions).write.parquet(Loader.dataPath(path)) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala index c9fb9768c1..22de4c4ac4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala @@ -91,11 +91,23 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { } - ignore("big model load / save") { - // create a model bigger than 32MB since 9000 * 1000 * 4 > 2^25 - val word2VecMap = Map((0 to 9000).map(i => s"$i" -> Array.fill(1000)(0.1f)): _*) + test("big model load / save") { + // backupping old values + val oldBufferConfValue = spark.conf.get("spark.kryoserializer.buffer.max", "64m") + val oldBufferMaxConfValue = spark.conf.get("spark.kryoserializer.buffer", "64k") + + // setting test values to trigger partitioning + spark.conf.set("spark.kryoserializer.buffer", "50b") + spark.conf.set("spark.kryoserializer.buffer.max", "50b") + + // create a model bigger than 50 Bytes + val word2VecMap = Map((0 to 10).map(i => s"$i" -> Array.fill(10)(0.1f)): _*) val model = new Word2VecModel(word2VecMap) + // est. size of this model, given the formula: + // (floatSize * vectorSize + 15) * numWords + // (4 * 10 + 15) * 10 = 550 + // therefore it should generate multiple partitions val tempDir = Utils.createTempDir() val path = tempDir.toURI.toString @@ -103,9 +115,16 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { model.save(sc, path) val sameModel = Word2VecModel.load(sc, path) assert(sameModel.getVectors.mapValues(_.toSeq) === model.getVectors.mapValues(_.toSeq)) + } + catch { + case t: Throwable => fail("exception thrown persisting a model " + + "that spans over multiple partitions", t) } finally { Utils.deleteRecursively(tempDir) + spark.conf.set("spark.kryoserializer.buffer", oldBufferConfValue) + spark.conf.set("spark.kryoserializer.buffer.max", oldBufferMaxConfValue) } + } test("test similarity for word vectors with large values is not Infinity or NaN") { -- GitLab