Skip to content
Snippets Groups Projects
Commit 0d1e67ee authored by zsxwing's avatar zsxwing Committed by Reynold Xin
Browse files

[SPARK-5214][Test] Add a test to demonstrate EventLoop can be stopped in the event thread

Author: zsxwing <zsxwing@gmail.com>

Closes #4174 from zsxwing/SPARK-5214-unittest and squashes the following commits:

443e564 [zsxwing] Change the check interval to 5ms
7aaa2d7 [zsxwing] Add a test to demonstrate EventLoop can be stopped in the event thread
parent 09e09c54
No related branches found
No related tags found
No related merge requests found
...@@ -41,7 +41,7 @@ class EventLoopSuite extends FunSuite with Timeouts { ...@@ -41,7 +41,7 @@ class EventLoopSuite extends FunSuite with Timeouts {
} }
eventLoop.start() eventLoop.start()
(1 to 100).foreach(eventLoop.post) (1 to 100).foreach(eventLoop.post)
eventually(timeout(5 seconds), interval(200 millis)) { eventually(timeout(5 seconds), interval(5 millis)) {
assert((1 to 100) === buffer.toSeq) assert((1 to 100) === buffer.toSeq)
} }
eventLoop.stop() eventLoop.stop()
...@@ -76,7 +76,7 @@ class EventLoopSuite extends FunSuite with Timeouts { ...@@ -76,7 +76,7 @@ class EventLoopSuite extends FunSuite with Timeouts {
} }
eventLoop.start() eventLoop.start()
eventLoop.post(1) eventLoop.post(1)
eventually(timeout(5 seconds), interval(200 millis)) { eventually(timeout(5 seconds), interval(5 millis)) {
assert(e === receivedError) assert(e === receivedError)
} }
eventLoop.stop() eventLoop.stop()
...@@ -98,7 +98,7 @@ class EventLoopSuite extends FunSuite with Timeouts { ...@@ -98,7 +98,7 @@ class EventLoopSuite extends FunSuite with Timeouts {
} }
eventLoop.start() eventLoop.start()
eventLoop.post(1) eventLoop.post(1)
eventually(timeout(5 seconds), interval(200 millis)) { eventually(timeout(5 seconds), interval(5 millis)) {
assert(e === receivedError) assert(e === receivedError)
assert(eventLoop.isActive) assert(eventLoop.isActive)
} }
...@@ -153,7 +153,7 @@ class EventLoopSuite extends FunSuite with Timeouts { ...@@ -153,7 +153,7 @@ class EventLoopSuite extends FunSuite with Timeouts {
}.start() }.start()
} }
eventually(timeout(5 seconds), interval(200 millis)) { eventually(timeout(5 seconds), interval(5 millis)) {
assert(threadNum * eventsFromEachThread === receivedEventsCount) assert(threadNum * eventsFromEachThread === receivedEventsCount)
} }
eventLoop.stop() eventLoop.stop()
...@@ -185,4 +185,22 @@ class EventLoopSuite extends FunSuite with Timeouts { ...@@ -185,4 +185,22 @@ class EventLoopSuite extends FunSuite with Timeouts {
} }
assert(false === eventLoop.isActive) assert(false === eventLoop.isActive)
} }
test("EventLoop: stop in eventThread") {
val eventLoop = new EventLoop[Int]("test") {
override def onReceive(event: Int): Unit = {
stop()
}
override def onError(e: Throwable): Unit = {
}
}
eventLoop.start()
eventLoop.post(1)
eventually(timeout(5 seconds), interval(5 millis)) {
assert(!eventLoop.isActive)
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment