diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 58b865969f517f87fae8f642d41bce68d3b741e7..622f7985ba444ca3250674fb073934b520f83c27 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.shuffle.ShuffleWriter -import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId} +import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId} import org.apache.spark.util.{MutablePair, Utils} abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext { @@ -277,7 +277,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC // Delete one of the local shuffle blocks. val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0)) val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0)) - assert(hashFile.exists() || sortFile.exists()) + val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0)) + assert(hashFile.exists() || (sortFile.exists() && indexFile.exists())) if (hashFile.exists()) { hashFile.delete() @@ -285,11 +286,36 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC if (sortFile.exists()) { sortFile.delete() } + if (indexFile.exists()) { + indexFile.delete() + } // This count should retry the execution of the previous stage and rerun shuffle. rdd.count() } + test("cannot find its local shuffle file if no execution of the stage and rerun shuffle") { + sc = new SparkContext("local", "test", conf.clone()) + val rdd = sc.parallelize(1 to 10, 1).map((_, 1)).reduceByKey(_ + _) + + // Cannot find one of the local shuffle blocks. + val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0)) + val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0)) + val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0)) + assert(!hashFile.exists() && !sortFile.exists() && !indexFile.exists()) + + rdd.count() + + // Can find one of the local shuffle blocks. + val hashExistsFile = sc.env.blockManager.diskBlockManager + .getFile(new ShuffleBlockId(0, 0, 0)) + val sortExistsFile = sc.env.blockManager.diskBlockManager + .getFile(new ShuffleDataBlockId(0, 0, 0)) + val indexExistsFile = sc.env.blockManager.diskBlockManager + .getFile(new ShuffleIndexBlockId(0, 0, 0)) + assert(hashExistsFile.exists() || (sortExistsFile.exists() && indexExistsFile.exists())) + } + test("metrics for shuffle without aggregation") { sc = new SparkContext("local", "test", conf.clone()) val numRecords = 10000