From 0d1e67ee9b29b51bccfc8a319afe9f9b4581afc8 Mon Sep 17 00:00:00 2001
From: zsxwing <zsxwing@gmail.com>
Date: Sat, 24 Jan 2015 11:00:35 -0800
Subject: [PATCH] [SPARK-5214][Test] Add a test to demonstrate EventLoop can be
 stopped in the event thread

Author: zsxwing <zsxwing@gmail.com>

Closes #4174 from zsxwing/SPARK-5214-unittest and squashes the following commits:

443e564 [zsxwing] Change the check interval to 5ms
7aaa2d7 [zsxwing] Add a test to demonstrate EventLoop can be stopped in the event thread
---
 .../apache/spark/util/EventLoopSuite.scala    | 26 ++++++++++++++++---
 1 file changed, 22 insertions(+), 4 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
index 10541f8784..1026cb2aa7 100644
--- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
@@ -41,7 +41,7 @@ class EventLoopSuite extends FunSuite with Timeouts {
     }
     eventLoop.start()
     (1 to 100).foreach(eventLoop.post)
-    eventually(timeout(5 seconds), interval(200 millis)) {
+    eventually(timeout(5 seconds), interval(5 millis)) {
       assert((1 to 100) === buffer.toSeq)
     }
     eventLoop.stop()
@@ -76,7 +76,7 @@ class EventLoopSuite extends FunSuite with Timeouts {
     }
     eventLoop.start()
     eventLoop.post(1)
-    eventually(timeout(5 seconds), interval(200 millis)) {
+    eventually(timeout(5 seconds), interval(5 millis)) {
       assert(e === receivedError)
     }
     eventLoop.stop()
@@ -98,7 +98,7 @@ class EventLoopSuite extends FunSuite with Timeouts {
     }
     eventLoop.start()
     eventLoop.post(1)
-    eventually(timeout(5 seconds), interval(200 millis)) {
+    eventually(timeout(5 seconds), interval(5 millis)) {
       assert(e === receivedError)
       assert(eventLoop.isActive)
     }
@@ -153,7 +153,7 @@ class EventLoopSuite extends FunSuite with Timeouts {
       }.start()
     }
 
-    eventually(timeout(5 seconds), interval(200 millis)) {
+    eventually(timeout(5 seconds), interval(5 millis)) {
       assert(threadNum * eventsFromEachThread === receivedEventsCount)
     }
     eventLoop.stop()
@@ -185,4 +185,22 @@ class EventLoopSuite extends FunSuite with Timeouts {
     }
     assert(false === eventLoop.isActive)
   }
+
+  test("EventLoop: stop in eventThread") {
+    val eventLoop = new EventLoop[Int]("test") {
+
+      override def onReceive(event: Int): Unit = {
+        stop()
+      }
+
+      override def onError(e: Throwable): Unit = {
+      }
+
+    }
+    eventLoop.start()
+    eventLoop.post(1)
+    eventually(timeout(5 seconds), interval(5 millis)) {
+      assert(!eventLoop.isActive)
+    }
+  }
 }
-- 
GitLab