diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index aa1d8ac7e6b452a264b31491d381eba2a09ba6b2..d3dd3a8fa4930cdc5dcf2cd8b656d5ce03cba272 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -105,52 +105,8 @@ class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter { masterTracker.incrementGeneration() slaveTracker.updateGeneration(masterTracker.getGeneration) intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } - } - - test("simulatenous fetch fails") { - System.clearProperty("spark.master.host") - val dummyActorSystem = ActorSystem("testDummy") - val dummyTracker = new MapOutputTracker(dummyActorSystem, true) - dummyTracker.registerShuffle(10, 1) - // val compressedSize1000 = MapOutputTracker.compressSize(1000L) - // val size100 = MapOutputTracker.decompressSize(compressedSize1000) - // dummyTracker.registerMapOutput(10, 0, new MapStatus( - // new BlockManagerId("hostA", 1000), Array(compressedSize1000))) - val serializedMessage = dummyTracker.getSerializedLocations(10) - val (actorSystem, boundPort) = - AkkaUtils.createActorSystem("test", "localhost", 0) - System.setProperty("spark.master.port", boundPort.toString) - val delayResponseLock = new java.lang.Object - val delayResponseActor = actorSystem.actorOf(Props(new Actor { - override def receive = { - case GetMapOutputStatuses(shuffleId: Int, requester: String) => - delayResponseLock.synchronized { - sender ! serializedMessage - } - } - }), name = "MapOutputTracker") - val slaveTracker = new MapOutputTracker(actorSystem, false) - var firstFailed = false - var secondFailed = false - val firstFetch = new Thread { - override def run() { - intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } - firstFailed = true - } - } - val secondFetch = new Thread { - override def run() { - intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } - secondFailed = true - } - } - delayResponseLock.synchronized { - firstFetch.start - secondFetch.start - } - firstFetch.join - secondFetch.join - assert(firstFailed && secondFailed) + // failure should be cached + intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } } }