diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index f1f9b69c89c69569af90e9efe8e43191d3dc0702..58d217ffef566561b7d2bd920704acfbfc518ff7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -26,10 +26,6 @@ import org.scalatest.Matchers import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.network.buffer.ManagedBuffer -import org.apache.spark.shuffle.IndexShuffleBlockResolver -import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.{ResetSystemProperties, RpcUtils} class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers @@ -219,24 +215,28 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("local metrics") { - val conf = new SparkConf() - .setMaster("local").setAppName("SparkListenerSuite") - .set("spark.shuffle.manager", classOf[SlowShuffleManager].getName) - sc = new SparkContext(conf) + sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) sc.addSparkListener(new StatsReportListener) + // just to make sure some of the tasks take a noticeable amount of time + val w = { i: Int => + if (i == 0) { + Thread.sleep(100) + } + i + } val numSlices = 16 - val d = sc.parallelize(0 to 1e3.toInt, numSlices) + val d = sc.parallelize(0 to 1e3.toInt, numSlices).map(w) d.count() sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) listener.stageInfos.size should be (1) - val d2 = d.map { i => i -> i * 2 }.setName("shuffle input 1") - val d3 = d.map { i => i -> (0 to (i % 5)) }.setName("shuffle input 2") + val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1") + val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2") val d4 = d2.cogroup(d3, numSlices).map { case (k, (v1, v2)) => - k -> (v1.size, v2.size) + w(k) -> (v1.size, v2.size) } d4.setName("A Cogroup") d4.collectAsMap() @@ -255,11 +255,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match taskInfoMetrics.map(_._2.executorDeserializeTime), stageInfo + " executorDeserializeTime") + /* Test is disabled (SEE SPARK-2208) if (stageInfo.rddInfos.exists(_.name == d4.name)) { checkNonZeroAvg( taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime), stageInfo + " fetchWaitTime") } + */ taskInfoMetrics.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0L) @@ -335,7 +337,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match listener.wait(remainingWait) remainingWait = finishTime - System.currentTimeMillis } - assert(listener.startedTasks.nonEmpty) + assert(!listener.startedTasks.isEmpty) } f.cancel() @@ -474,15 +476,3 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene var count = 0 override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1 } - -/** Slow ShuffleManager to simulate tasks that takes a noticeable amount of time */ -private class SlowShuffleManager(conf: SparkConf) extends SortShuffleManager(conf) { - - override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) { - - override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { - Thread.sleep(10) - super.getBlockData(blockId) - } - } -}