Skip to content
Snippets Groups Projects
Commit 4a5fe091 authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when...

[SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later

`deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it will throw `java.util.NoSuchElementException: key not found` when restarting it.

Author: zsxwing <zsxwing@gmail.com>

Closes #8538 from zsxwing/SPARK-10369.
parent 72f6dbf7
No related branches found
No related tags found
No related merge requests found
......@@ -291,7 +291,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
ReceiverTrackingInfo(
streamId, ReceiverState.INACTIVE, None, None, None, None, Some(errorInfo))
}
receiverTrackingInfos -= streamId
receiverTrackingInfos(streamId) = newReceiverTrackingInfo
listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo))
val messageWithError = if (error != null && !error.isEmpty) {
s"$message - $error"
......@@ -483,7 +483,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
context.reply(true)
// Local messages
case AllReceiverIds =>
context.reply(receiverTrackingInfos.keys.toSeq)
context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
case StopAllReceivers =>
assert(isTrackerStopping || isTrackerStopped)
stopReceivers()
......
......@@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase {
}
}
}
test("should restart receiver after stopping it") {
withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
@volatile var startTimes = 0
ssc.addStreamingListener(new StreamingListener {
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
startTimes += 1
}
})
val input = ssc.receiverStream(new StoppableReceiver)
val output = new TestOutputStream(input)
output.register()
ssc.start()
StoppableReceiver.shouldStop = true
eventually(timeout(10 seconds), interval(10 millis)) {
// The receiver is stopped once, so if it's restarted, it should be started twice.
assert(startTimes === 2)
}
}
}
}
/** An input DStream with for testing rate controlling */
......@@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver {
def getActive(): Option[RateTestReceiver] = Option(activeReceiver)
}
/**
* A custom receiver that could be stopped via StoppableReceiver.shouldStop
*/
class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
var receivingThreadOption: Option[Thread] = None
def onStart() {
val thread = new Thread() {
override def run() {
while (!StoppableReceiver.shouldStop) {
Thread.sleep(10)
}
StoppableReceiver.this.stop("stop")
}
}
thread.start()
}
def onStop() {
StoppableReceiver.shouldStop = true
receivingThreadOption.foreach(_.join())
// Reset it so as to restart it
StoppableReceiver.shouldStop = false
}
}
object StoppableReceiver {
@volatile var shouldStop = false
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment