From 9f019c7223bb79b8d5cd52980b2723a1601d1134 Mon Sep 17 00:00:00 2001
From: zsxwing <zsxwing@gmail.com>
Date: Tue, 5 May 2015 23:25:28 -0700
Subject: [PATCH] [SPARK-7384][Core][Tests] Fix flaky tests for distributed
 mode in BroadcastSuite

Fixed the following failure: https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.3-Maven-pre-YARN/hadoop.version=1.0.4,label=centos/452/testReport/junit/org.apache.spark.broadcast/BroadcastSuite/Unpersisting_HttpBroadcast_on_executors_and_driver_in_distributed_mode/

The tests should wait until all slaves are up. Otherwise, there may be only a part of `BlockManager`s registered, and fail the tests.

Author: zsxwing <zsxwing@gmail.com>

Closes #5925 from zsxwing/SPARK-7384 and squashes the following commits:

783cb7b [zsxwing] Add comments for _jobProgressListener and remove postfixOps
1009ef1 [zsxwing] [SPARK-7384][Core][Tests] Fix flaky tests for distributed mode in BroadcastSuite
---
 .../main/scala/org/apache/spark/SparkContext.scala |  8 +++++---
 .../apache/spark/broadcast/BroadcastSuite.scala    | 14 +++++++++++++-
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 682dec44ac..b5f040ceb1 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -407,15 +407,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
     if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
 
+    // "_jobProgressListener" should be set up before creating SparkEnv because when creating
+    // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
+    _jobProgressListener = new JobProgressListener(_conf)
+    listenerBus.addListener(jobProgressListener)
+
     // Create the Spark execution environment (cache, map output tracker, etc)
     _env = createSparkEnv(_conf, isLocal, listenerBus)
     SparkEnv.set(_env)
 
     _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
 
-    _jobProgressListener = new JobProgressListener(_conf)
-    listenerBus.addListener(jobProgressListener)
-
     _statusTracker = new SparkStatusTracker(this)
 
     _progressBar =
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index c8fdfa6939..06e5f1cf6b 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.broadcast
 
+import scala.concurrent.duration._
 import scala.util.Random
 
 import org.scalatest.{Assertions, FunSuite}
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkEnv}
 import org.apache.spark.io.SnappyCompressionCodec
@@ -307,7 +309,17 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
       removeFromDriver: Boolean) {
 
     sc = if (distributed) {
-      new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
+      val _sc =
+        new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
+      // Wait until all salves are up
+      eventually(timeout(10.seconds), interval(10.milliseconds)) {
+        _sc.jobProgressListener.synchronized {
+          val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size
+          assert(numBlockManagers == numSlaves + 1,
+            s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}")
+        }
+      }
+      _sc
     } else {
       new SparkContext("local", "test", broadcastConf)
     }
-- 
GitLab