Skip to content
Snippets Groups Projects
Commit 44c28abf authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-10224] [STREAMING] Fix the issue that blockIntervalTimer won't call...

[SPARK-10224] [STREAMING] Fix the issue that blockIntervalTimer won't call updateCurrentBuffer when stopping

`blockIntervalTimer.stop(interruptTimer = false)` doesn't guarantee calling `updateCurrentBuffer`. So it's possible that `blockIntervalTimer` will exit when `updateCurrentBuffer` is not empty. Then the data in `currentBuffer` will be lost.

To reproduce it, you can add `Thread.sleep(200)` in this line (https://github.com/apache/spark/blob/69c9c177160e32a2fbc9b36ecc52156077fca6fc/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala#L100) and run `StreamingContexSuite`.
I cannot write a unit test to reproduce it because I cannot find an approach to force `RecurringTimer` suspend at this line for a few milliseconds.

There was a failure in Jenkins here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/41455/console

This PR updates RecurringTimer to make sure `stop(interruptTimer = false)` will call `callback` at least once after the `stop` method is called.

Author: zsxwing <zsxwing@gmail.com>

Closes #8417 from zsxwing/SPARK-10224.
parent 5548a254
No related branches found
No related tags found
No related merge requests found
......@@ -72,8 +72,10 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
/**
* Stop the timer, and return the last time the callback was made.
* interruptTimer = true will interrupt the callback
* - interruptTimer = true will interrupt the callback
* if it is in progress (not guaranteed to give correct time in this case).
* - interruptTimer = false guarantees that there will be at least one callback after `stop` has
* been called.
*/
def stop(interruptTimer: Boolean): Long = synchronized {
if (!stopped) {
......@@ -87,18 +89,23 @@ class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name:
prevTime
}
private def triggerActionForNextInterval(): Unit = {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
logDebug("Callback for " + name + " called at time " + prevTime)
}
/**
* Repeatedly call the callback every interval.
*/
private def loop() {
try {
while (!stopped) {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
logDebug("Callback for " + name + " called at time " + prevTime)
triggerActionForNextInterval()
}
triggerActionForNextInterval()
} catch {
case e: InterruptedException =>
}
......
......@@ -184,9 +184,10 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
// Verify that the final data is present in the final generated block and
// pushed before complete stop
assert(blockGenerator.isStopped() === false) // generator has not stopped yet
clock.advance(blockIntervalMs) // force block generation
failAfter(1 second) {
thread.join()
eventually(timeout(10 seconds), interval(10 milliseconds)) {
// Keep calling `advance` to avoid blocking forever in `clock.waitTillTime`
clock.advance(blockIntervalMs)
assert(thread.isAlive === false)
}
assert(blockGenerator.isStopped() === true) // generator has finally been completely stopped
assert(listener.pushedData === data, "All data not pushed by stop()")
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.util
import scala.collection.mutable
import scala.concurrent.duration._
import org.scalatest.PrivateMethodTester
import org.scalatest.concurrent.Eventually._
import org.apache.spark.SparkFunSuite
import org.apache.spark.util.ManualClock
class RecurringTimerSuite extends SparkFunSuite with PrivateMethodTester {
test("basic") {
val clock = new ManualClock()
val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long]
val timer = new RecurringTimer(clock, 100, time => {
results += time
}, "RecurringTimerSuite-basic")
timer.start(0)
eventually(timeout(10.seconds), interval(10.millis)) {
assert(results === Seq(0L))
}
clock.advance(100)
eventually(timeout(10.seconds), interval(10.millis)) {
assert(results === Seq(0L, 100L))
}
clock.advance(200)
eventually(timeout(10.seconds), interval(10.millis)) {
assert(results === Seq(0L, 100L, 200L, 300L))
}
assert(timer.stop(interruptTimer = true) === 300L)
}
test("SPARK-10224: call 'callback' after stopping") {
val clock = new ManualClock()
val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long]
val timer = new RecurringTimer(clock, 100, time => {
results += time
}, "RecurringTimerSuite-SPARK-10224")
timer.start(0)
eventually(timeout(10.seconds), interval(10.millis)) {
assert(results === Seq(0L))
}
@volatile var lastTime = -1L
// Now RecurringTimer is waiting for the next interval
val thread = new Thread {
override def run(): Unit = {
lastTime = timer.stop(interruptTimer = false)
}
}
thread.start()
val stopped = PrivateMethod[RecurringTimer]('stopped)
// Make sure the `stopped` field has been changed
eventually(timeout(10.seconds), interval(10.millis)) {
assert(timer.invokePrivate(stopped()) === true)
}
clock.advance(200)
// When RecurringTimer is awake from clock.waitTillTime, it will call `callback` once.
// Then it will find `stopped` is true and exit the loop, but it should call `callback` again
// before exiting its internal thread.
thread.join()
assert(results === Seq(0L, 100L, 200L))
assert(lastTime === 200L)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment