diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index a51338c0170711056bf9f8c20faa047c8becc762..c895fb32069cf165203653206268d32a66cee258 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -212,7 +212,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } - /** Perform shuffle cleanup, asynchronously. */ + /** Perform shuffle cleanup. */ def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = { try { logDebug("Cleaning shuffle " + shuffleId) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 541923048a3fa338350b203e0a4d3bc1a3656bff..509c944fed74cb63a9e52b5fd196d9441bdc03b1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.json4s.DefaultFormats import org.json4s.JsonDSL._ -import org.apache.spark.Partitioner +import org.apache.spark.{Dependency, Partitioner, ShuffleDependency, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.{Estimator, Model} @@ -706,13 +706,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { previousItemFactors.unpersist() itemFactors.setName(s"itemFactors-$iter").persist(intermediateRDDStorageLevel) // TODO: Generalize PeriodicGraphCheckpointer and use it here. + val deps = itemFactors.dependencies if (shouldCheckpoint(iter)) { - itemFactors.checkpoint() // itemFactors gets materialized in computeFactors. + itemFactors.checkpoint() // itemFactors gets materialized in computeFactors } val previousUserFactors = userFactors userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam, itemLocalIndexEncoder, implicitPrefs, alpha, solver) if (shouldCheckpoint(iter)) { + ALS.cleanShuffleDependencies(sc, deps) deletePreviousCheckpointFile() previousCheckpointFile = itemFactors.getCheckpointFile } @@ -723,8 +725,10 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam, userLocalIndexEncoder, solver = solver) if (shouldCheckpoint(iter)) { + val deps = itemFactors.dependencies itemFactors.checkpoint() itemFactors.count() // checkpoint item factors and cut lineage + ALS.cleanShuffleDependencies(sc, deps) deletePreviousCheckpointFile() previousCheckpointFile = itemFactors.getCheckpointFile } @@ -1355,4 +1359,31 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { * satisfies this requirement, we simply use a type alias here. */ private[recommendation] type ALSPartitioner = org.apache.spark.HashPartitioner + + /** + * Private function to clean up all of the shuffles files from the dependencies and their parents. + */ + private[spark] def cleanShuffleDependencies[T]( + sc: SparkContext, + deps: Seq[Dependency[_]], + blocking: Boolean = false): Unit = { + // If there is no reference tracking we skip clean up. + sc.cleaner.foreach { cleaner => + /** + * Clean the shuffles & all of its parents. + */ + def cleanEagerly(dep: Dependency[_]): Unit = { + if (dep.isInstanceOf[ShuffleDependency[_, _, _]]) { + val shuffleId = dep.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId + cleaner.doCleanupShuffle(shuffleId, blocking) + } + val rdd = dep.rdd + val rddDeps = rdd.dependencies + if (rdd.getStorageLevel == StorageLevel.NONE && rddDeps != null) { + rddDeps.foreach(cleanEagerly) + } + } + deps.foreach(cleanEagerly) + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 4c4eb72cd16e760788ac18f5a46c227539c5983c..1704037395780bab26164c898e58d57aac0f4d00 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -17,15 +17,19 @@ package org.apache.spark.ml.recommendation +import java.io.File import java.util.Random import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ import scala.language.existentials import com.github.fommil.netlib.BLAS.{getInstance => blas} +import org.apache.commons.io.FileUtils +import org.apache.commons.io.filefilter.TrueFileFilter -import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.ml.recommendation.ALS._ import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} @@ -34,8 +38,9 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted} -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils class ALSSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest with Logging { @@ -255,37 +260,7 @@ class ALSSuite rank: Int, noiseStd: Double = 0.0, seed: Long = 11L): (RDD[Rating[Int]], RDD[Rating[Int]]) = { - // The assumption of the implicit feedback model is that unobserved ratings are more likely to - // be negatives. - val positiveFraction = 0.8 - val negativeFraction = 1.0 - positiveFraction - val trainingFraction = 0.6 - val testFraction = 0.3 - val totalFraction = trainingFraction + testFraction - val random = new Random(seed) - val userFactors = genFactors(numUsers, rank, random) - val itemFactors = genFactors(numItems, rank, random) - val training = ArrayBuffer.empty[Rating[Int]] - val test = ArrayBuffer.empty[Rating[Int]] - for ((userId, userFactor) <- userFactors; (itemId, itemFactor) <- itemFactors) { - val rating = blas.sdot(rank, userFactor, 1, itemFactor, 1) - val threshold = if (rating > 0) positiveFraction else negativeFraction - val observed = random.nextDouble() < threshold - if (observed) { - val x = random.nextDouble() - if (x < totalFraction) { - if (x < trainingFraction) { - val noise = noiseStd * random.nextGaussian() - training += Rating(userId, itemId, rating + noise.toFloat) - } else { - test += Rating(userId, itemId, rating) - } - } - } - } - logInfo(s"Generated an implicit feedback dataset with ${training.size} ratings for training " + - s"and ${test.size} for test.") - (sc.parallelize(training, 2), sc.parallelize(test, 2)) + ALSSuite.genImplicitTestData(sc, numUsers, numItems, rank, noiseStd, seed) } /** @@ -304,14 +279,7 @@ class ALSSuite random: Random, a: Float = -1.0f, b: Float = 1.0f): Seq[(Int, Array[Float])] = { - require(size > 0 && size < Int.MaxValue / 3) - require(b > a) - val ids = mutable.Set.empty[Int] - while (ids.size < size) { - ids += random.nextInt() - } - val width = b - a - ids.toSeq.sorted.map(id => (id, Array.fill(rank)(a + random.nextFloat() * width))) + ALSSuite.genFactors(size, rank, random, a, b) } /** @@ -520,6 +488,79 @@ class ALSSuite } } +class ALSCleanerSuite extends SparkFunSuite { + test("ALS shuffle cleanup standalone") { + val conf = new SparkConf() + val localDir = Utils.createTempDir() + val checkpointDir = Utils.createTempDir() + def getAllFiles: Set[File] = + FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet + try { + conf.set("spark.local.dir", localDir.getAbsolutePath) + val sc = new SparkContext("local[2]", "test", conf) + try { + sc.setCheckpointDir(checkpointDir.getAbsolutePath) + // Test checkpoint and clean parents + val input = sc.parallelize(1 to 1000) + val keyed = input.map(x => (x % 20, 1)) + val shuffled = keyed.reduceByKey(_ + _) + val keysOnly = shuffled.keys + val deps = keysOnly.dependencies + keysOnly.count() + ALS.cleanShuffleDependencies(sc, deps, true) + val resultingFiles = getAllFiles + assert(resultingFiles === Set()) + // Ensure running count again works fine even if we kill the shuffle files. + keysOnly.count() + } finally { + sc.stop() + } + } finally { + Utils.deleteRecursively(localDir) + Utils.deleteRecursively(checkpointDir) + } + } + + test("ALS shuffle cleanup in algorithm") { + val conf = new SparkConf() + val localDir = Utils.createTempDir() + val checkpointDir = Utils.createTempDir() + def getAllFiles: Set[File] = + FileUtils.listFiles(localDir, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE).asScala.toSet + try { + conf.set("spark.local.dir", localDir.getAbsolutePath) + val sc = new SparkContext("local[2]", "test", conf) + try { + sc.setCheckpointDir(checkpointDir.getAbsolutePath) + // Generate test data + val (training, _) = ALSSuite.genImplicitTestData(sc, 20, 5, 1, 0.2, 0) + // Implicitly test the cleaning of parents during ALS training + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + val als = new ALS() + .setRank(1) + .setRegParam(1e-5) + .setSeed(0) + .setCheckpointInterval(1) + .setMaxIter(7) + val model = als.fit(training.toDF()) + val resultingFiles = getAllFiles + // We expect the last shuffles files, block ratings, user factors, and item factors to be + // around but no more. + val pattern = "shuffle_(\\d+)_.+\\.data".r + val rddIds = resultingFiles.flatMap { f => + pattern.findAllIn(f.getName()).matchData.map { _.group(1) } } + assert(rddIds.toSet.size === 4) + } finally { + sc.stop() + } + } finally { + Utils.deleteRecursively(localDir) + Utils.deleteRecursively(checkpointDir) + } + } +} + class ALSStorageSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest with Logging { @@ -591,7 +632,7 @@ private class IntermediateRDDStorageListener extends SparkListener { } -object ALSSuite { +object ALSSuite extends Logging { /** * Mapping from all Params to valid settings which differ from the defaults. @@ -620,4 +661,82 @@ object ALSSuite { "intermediateStorageLevel" -> "MEMORY_ONLY", "finalStorageLevel" -> "MEMORY_AND_DISK_SER" ) + + // Helper functions to generate test data we share between ALS test suites + + /** + * Generates random user/item factors, with i.i.d. values drawn from U(a, b). + * @param size number of users/items + * @param rank number of features + * @param random random number generator + * @param a min value of the support (default: -1) + * @param b max value of the support (default: 1) + * @return a sequence of (ID, factors) pairs + */ + private def genFactors( + size: Int, + rank: Int, + random: Random, + a: Float = -1.0f, + b: Float = 1.0f): Seq[(Int, Array[Float])] = { + require(size > 0 && size < Int.MaxValue / 3) + require(b > a) + val ids = mutable.Set.empty[Int] + while (ids.size < size) { + ids += random.nextInt() + } + val width = b - a + ids.toSeq.sorted.map(id => (id, Array.fill(rank)(a + random.nextFloat() * width))) + } + + /** + * Generates an implicit feedback dataset for testing ALS. + * + * @param sc SparkContext + * @param numUsers number of users + * @param numItems number of items + * @param rank rank + * @param noiseStd the standard deviation of additive Gaussian noise on training data + * @param seed random seed + * @return (training, test) + */ + def genImplicitTestData( + sc: SparkContext, + numUsers: Int, + numItems: Int, + rank: Int, + noiseStd: Double = 0.0, + seed: Long = 11L): (RDD[Rating[Int]], RDD[Rating[Int]]) = { + // The assumption of the implicit feedback model is that unobserved ratings are more likely to + // be negatives. + val positiveFraction = 0.8 + val negativeFraction = 1.0 - positiveFraction + val trainingFraction = 0.6 + val testFraction = 0.3 + val totalFraction = trainingFraction + testFraction + val random = new Random(seed) + val userFactors = genFactors(numUsers, rank, random) + val itemFactors = genFactors(numItems, rank, random) + val training = ArrayBuffer.empty[Rating[Int]] + val test = ArrayBuffer.empty[Rating[Int]] + for ((userId, userFactor) <- userFactors; (itemId, itemFactor) <- itemFactors) { + val rating = blas.sdot(rank, userFactor, 1, itemFactor, 1) + val threshold = if (rating > 0) positiveFraction else negativeFraction + val observed = random.nextDouble() < threshold + if (observed) { + val x = random.nextDouble() + if (x < totalFraction) { + if (x < trainingFraction) { + val noise = noiseStd * random.nextGaussian() + training += Rating(userId, itemId, rating + noise.toFloat) + } else { + test += Rating(userId, itemId, rating) + } + } + } + } + logInfo(s"Generated an implicit feedback dataset with ${training.size} ratings for training " + + s"and ${test.size} for test.") + (sc.parallelize(training, 2), sc.parallelize(test, 2)) + } }