Skip to content
Snippets Groups Projects
Commit f398640d authored by caoxuewen's avatar caoxuewen Committed by Sean Owen
Browse files

[SPARK-20607][CORE] Add new unit tests to ShuffleSuite

## What changes were proposed in this pull request?

This PR update to two:
1.adds the new unit tests.
  testing would be performed when there is no shuffle stage,
  shuffle will not generate the data file and the index files.
2.Modify the '[SPARK-4085] rerun map stage if reduce stage cannot find its local shuffle file' unit test,
  parallelize is 1 but not is 2, Check the index file and delete.

## How was this patch tested?
The new unit test.

Author: caoxuewen <cao.xuewen@zte.com.cn>

Closes #17868 from heary-cao/ShuffleSuite.
parent 3f2cd51e
No related branches found
No related tags found
No related merge requests found
...@@ -28,7 +28,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD ...@@ -28,7 +28,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD
import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd} import org.apache.spark.scheduler.{MapStatus, MyRDD, SparkListener, SparkListenerTaskEnd}
import org.apache.spark.serializer.KryoSerializer import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.shuffle.ShuffleWriter import org.apache.spark.shuffle.ShuffleWriter
import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId} import org.apache.spark.storage.{ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId}
import org.apache.spark.util.{MutablePair, Utils} import org.apache.spark.util.{MutablePair, Utils}
abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext { abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkContext {
...@@ -277,7 +277,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC ...@@ -277,7 +277,8 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// Delete one of the local shuffle blocks. // Delete one of the local shuffle blocks.
val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0)) val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0)) val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
assert(hashFile.exists() || sortFile.exists()) val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0))
assert(hashFile.exists() || (sortFile.exists() && indexFile.exists()))
if (hashFile.exists()) { if (hashFile.exists()) {
hashFile.delete() hashFile.delete()
...@@ -285,11 +286,36 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC ...@@ -285,11 +286,36 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
if (sortFile.exists()) { if (sortFile.exists()) {
sortFile.delete() sortFile.delete()
} }
if (indexFile.exists()) {
indexFile.delete()
}
// This count should retry the execution of the previous stage and rerun shuffle. // This count should retry the execution of the previous stage and rerun shuffle.
rdd.count() rdd.count()
} }
test("cannot find its local shuffle file if no execution of the stage and rerun shuffle") {
sc = new SparkContext("local", "test", conf.clone())
val rdd = sc.parallelize(1 to 10, 1).map((_, 1)).reduceByKey(_ + _)
// Cannot find one of the local shuffle blocks.
val hashFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleBlockId(0, 0, 0))
val sortFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleDataBlockId(0, 0, 0))
val indexFile = sc.env.blockManager.diskBlockManager.getFile(new ShuffleIndexBlockId(0, 0, 0))
assert(!hashFile.exists() && !sortFile.exists() && !indexFile.exists())
rdd.count()
// Can find one of the local shuffle blocks.
val hashExistsFile = sc.env.blockManager.diskBlockManager
.getFile(new ShuffleBlockId(0, 0, 0))
val sortExistsFile = sc.env.blockManager.diskBlockManager
.getFile(new ShuffleDataBlockId(0, 0, 0))
val indexExistsFile = sc.env.blockManager.diskBlockManager
.getFile(new ShuffleIndexBlockId(0, 0, 0))
assert(hashExistsFile.exists() || (sortExistsFile.exists() && indexExistsFile.exists()))
}
test("metrics for shuffle without aggregation") { test("metrics for shuffle without aggregation") {
sc = new SparkContext("local", "test", conf.clone()) sc = new SparkContext("local", "test", conf.clone())
val numRecords = 10000 val numRecords = 10000
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment