From acef51defb991bcdc99b76cf2a126afd6d60ec70 Mon Sep 17 00:00:00 2001
From: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Date: Wed, 25 Mar 2015 13:27:15 -0700
Subject: [PATCH] [SPARK-6537] UIWorkloadGenerator: The main thread should not
 stop SparkContext until all jobs finish

The main thread of UIWorkloadGenerator spawn sub threads to launch jobs but the main thread stop SparkContext without waiting for finishing those threads.

Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes #5187 from sarutak/SPARK-6537 and squashes the following commits:

4e9307a [Kousuke Saruta] Fixed UIWorkloadGenerator so that the main thread stop SparkContext after all jobs finish
---
 .../scala/org/apache/spark/ui/UIWorkloadGenerator.scala  | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 19ac7a826e..5fbcd6bb8a 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.ui
 
+import java.util.concurrent.Semaphore
+
 import scala.util.Random
 
 import org.apache.spark.{SparkConf, SparkContext}
@@ -88,6 +90,8 @@ private[spark] object UIWorkloadGenerator {
       ("Job with delays", baseData.map(x => Thread.sleep(100)).count)
     )
 
+    val barrier = new Semaphore(-nJobSet * jobs.size + 1)
+
     (1 to nJobSet).foreach { _ =>
       for ((desc, job) <- jobs) {
         new Thread {
@@ -99,12 +103,17 @@ private[spark] object UIWorkloadGenerator {
             } catch {
               case e: Exception =>
                 println("Job Failed: " + desc)
+            } finally {
+              barrier.release()
             }
           }
         }.start
         Thread.sleep(INTER_JOB_WAIT_MS)
       }
     }
+
+    // Waiting for threads.
+    barrier.acquire()
     sc.stop()
   }
 }
-- 
GitLab