diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala index dd9f2d7e91cdcd48398150ed96870c6b8dcfa7fb..a45a5efbb41f9d28f8ded083898dfbb8723f6257 100644 --- a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala @@ -17,16 +17,22 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc val listener = new SaveStageInfo sc.addSparkListener(listener) sc.addSparkListener(new StatsReportListener) + //just to make sure some of the tasks take a noticeable amount of time + val w = {i:Int => + if (i == 0) + Thread.sleep(25) + i + } - val d = sc.parallelize(1 to 1e4.toInt, 64) + val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)} d.count listener.stageInfos.size should be (1) - val d2 = d.map{i => i -> i * 2}.setName("shuffle input 1") + val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1") - val d3 = d.map{i => i -> (0 to (i % 5))}.setName("shuffle input 2") + val d3 = d.map{i => w(i) -> (0 to (i % 5))}.setName("shuffle input 2") - val d4 = d2.cogroup(d3, 64).map{case(k,(v1,v2)) => k -> (v1.size, v2.size)} + val d4 = d2.cogroup(d3, 64).map{case(k,(v1,v2)) => w(k) -> (v1.size, v2.size)} d4.setName("A Cogroup") d4.collectAsMap