From de953c214c025fbc7b0e94f85625d72091e7257e Mon Sep 17 00:00:00 2001 From: jinxing <jinxing6042@126.com> Date: Tue, 30 May 2017 14:02:33 -0500 Subject: [PATCH] [SPARK-20333] HashPartitioner should be compatible with num of child RDD's partitions. ## What changes were proposed in this pull request? Fix test "don't submit stage until its dependencies map outputs are registered (SPARK-5259)" , "run trivial shuffle with out-of-band executor failure and retry", "reduce tasks should be placed locally with map output" in DAGSchedulerSuite. Author: jinxing <jinxing6042@126.com> Closes #17634 from jinxing64/SPARK-20333. --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a10941b579..67145e7445 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1277,10 +1277,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou */ test("don't submit stage until its dependencies map outputs are registered (SPARK-5259)") { val firstRDD = new MyRDD(sc, 3, Nil) - val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2)) + val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3)) val firstShuffleId = firstShuffleDep.shuffleId val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) @@ -1583,7 +1583,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou */ test("run trivial shuffle with out-of-band executor failure and retry") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) submit(reduceRdd, Array(0)) @@ -1791,7 +1791,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou test("reduce tasks should be placed locally with map output") { // Create a shuffleMapRdd with 1 partition val shuffleMapRdd = new MyRDD(sc, 1, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) submit(reduceRdd, Array(0)) -- GitLab