Skip to content
Snippets Groups Projects
Commit f3bfb711 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-9966] [STREAMING] Handle couple of corner cases in PIDRateEstimator

1. The rate estimator should not estimate any rate when there are no records in the batch, as there is no data to estimate the rate. In the current state, it estimates and set the rate to zero. That is incorrect.

2. The rate estimator should not never set the rate to zero under any circumstances. Otherwise the system will stop receiving data, and stop generating useful estimates (see reason 1). So the fix is to define a parameters that sets a lower bound on the estimated rate, so that the system always receives some data.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8199 from tdas/SPARK-9966 and squashes the following commits:

829f793 [Tathagata Das] Fixed unit test and added comments
3a994db [Tathagata Das] Added min rate and updated tests in PIDRateEstimator
parent 1150a19b
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,8 @@
package org.apache.spark.streaming.scheduler.rate
import org.apache.spark.Logging
/**
* Implements a proportional-integral-derivative (PID) controller which acts on
* the speed of ingestion of elements into Spark Streaming. A PID controller works
......@@ -26,7 +28,7 @@ package org.apache.spark.streaming.scheduler.rate
*
* @see https://en.wikipedia.org/wiki/PID_controller
*
* @param batchDurationMillis the batch duration, in milliseconds
* @param batchIntervalMillis the batch duration, in milliseconds
* @param proportional how much the correction should depend on the current
* error. This term usually provides the bulk of correction and should be positive or zero.
* A value too large would make the controller overshoot the setpoint, while a small value
......@@ -39,13 +41,17 @@ package org.apache.spark.streaming.scheduler.rate
* of future errors, based on current rate of change. This value should be positive or 0.
* This term is not used very often, as it impacts stability of the system. The default
* value is 0.
* @param minRate what is the minimum rate that can be estimated.
* This must be greater than zero, so that the system always receives some data for rate
* estimation to work.
*/
private[streaming] class PIDRateEstimator(
batchIntervalMillis: Long,
proportional: Double = 1D,
integral: Double = .2D,
derivative: Double = 0D)
extends RateEstimator {
proportional: Double,
integral: Double,
derivative: Double,
minRate: Double
) extends RateEstimator with Logging {
private var firstRun: Boolean = true
private var latestTime: Long = -1L
......@@ -64,16 +70,23 @@ private[streaming] class PIDRateEstimator(
require(
derivative >= 0,
s"Derivative term $derivative in PIDRateEstimator should be >= 0.")
require(
minRate > 0,
s"Minimum rate in PIDRateEstimator should be > 0")
logInfo(s"Created PIDRateEstimator with proportional = $proportional, integral = $integral, " +
s"derivative = $derivative, min rate = $minRate")
def compute(time: Long, // in milliseconds
def compute(
time: Long, // in milliseconds
numElements: Long,
processingDelay: Long, // in milliseconds
schedulingDelay: Long // in milliseconds
): Option[Double] = {
logTrace(s"\ntime = $time, # records = $numElements, " +
s"processing time = $processingDelay, scheduling delay = $schedulingDelay")
this.synchronized {
if (time > latestTime && processingDelay > 0 && batchIntervalMillis > 0) {
if (time > latestTime && numElements > 0 && processingDelay > 0) {
// in seconds, should be close to batchDuration
val delaySinceUpdate = (time - latestTime).toDouble / 1000
......@@ -104,21 +117,30 @@ private[streaming] class PIDRateEstimator(
val newRate = (latestRate - proportional * error -
integral * historicalError -
derivative * dError).max(0.0)
derivative * dError).max(minRate)
logTrace(s"""
| latestRate = $latestRate, error = $error
| latestError = $latestError, historicalError = $historicalError
| delaySinceUpdate = $delaySinceUpdate, dError = $dError
""".stripMargin)
latestTime = time
if (firstRun) {
latestRate = processingRate
latestError = 0D
firstRun = false
logTrace("First run, rate estimation skipped")
None
} else {
latestRate = newRate
latestError = error
logTrace(s"New rate = $newRate")
Some(newRate)
}
} else None
} else {
logTrace("Rate estimation skipped")
None
}
}
}
}
......@@ -18,7 +18,6 @@
package org.apache.spark.streaming.scheduler.rate
import org.apache.spark.SparkConf
import org.apache.spark.SparkException
import org.apache.spark.streaming.Duration
/**
......@@ -61,7 +60,8 @@ object RateEstimator {
val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)
case estimator =>
throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
......
......@@ -36,72 +36,89 @@ class PIDRateEstimatorSuite extends SparkFunSuite with Matchers {
test("estimator checks ranges") {
intercept[IllegalArgumentException] {
new PIDRateEstimator(0, 1, 2, 3)
new PIDRateEstimator(batchIntervalMillis = 0, 1, 2, 3, 10)
}
intercept[IllegalArgumentException] {
new PIDRateEstimator(100, -1, 2, 3)
new PIDRateEstimator(100, proportional = -1, 2, 3, 10)
}
intercept[IllegalArgumentException] {
new PIDRateEstimator(100, 0, -1, 3)
new PIDRateEstimator(100, 0, integral = -1, 3, 10)
}
intercept[IllegalArgumentException] {
new PIDRateEstimator(100, 0, 0, -1)
new PIDRateEstimator(100, 0, 0, derivative = -1, 10)
}
intercept[IllegalArgumentException] {
new PIDRateEstimator(100, 0, 0, 0, minRate = 0)
}
intercept[IllegalArgumentException] {
new PIDRateEstimator(100, 0, 0, 0, minRate = -10)
}
}
private def createDefaultEstimator: PIDRateEstimator = {
new PIDRateEstimator(20, 1D, 0D, 0D)
}
test("first bound is None") {
val p = createDefaultEstimator
test("first estimate is None") {
val p = createDefaultEstimator()
p.compute(0, 10, 10, 0) should equal(None)
}
test("second bound is rate") {
val p = createDefaultEstimator
test("second estimate is not None") {
val p = createDefaultEstimator()
p.compute(0, 10, 10, 0)
// 1000 elements / s
p.compute(10, 10, 10, 0) should equal(Some(1000))
}
test("works even with no time between updates") {
val p = createDefaultEstimator
test("no estimate when no time difference between successive calls") {
val p = createDefaultEstimator()
p.compute(0, 10, 10, 0)
p.compute(time = 10, 10, 10, 0) shouldNot equal(None)
p.compute(time = 10, 10, 10, 0) should equal(None)
}
test("no estimate when no records in previous batch") {
val p = createDefaultEstimator()
p.compute(0, 10, 10, 0)
p.compute(10, 10, 10, 0)
p.compute(10, 10, 10, 0) should equal(None)
p.compute(10, numElements = 0, 10, 0) should equal(None)
p.compute(20, numElements = -10, 10, 0) should equal(None)
}
test("bound is never negative") {
val p = new PIDRateEstimator(20, 1D, 1D, 0D)
test("no estimate when there is no processing delay") {
val p = createDefaultEstimator()
p.compute(0, 10, 10, 0)
p.compute(10, 10, processingDelay = 0, 0) should equal(None)
p.compute(20, 10, processingDelay = -10, 0) should equal(None)
}
test("estimate is never less than min rate") {
val minRate = 5D
val p = new PIDRateEstimator(20, 1D, 1D, 0D, minRate)
// prepare a series of batch updates, one every 20ms, 0 processed elements, 2ms of processing
// this might point the estimator to try and decrease the bound, but we test it never
// goes below zero, which would be nonsensical.
// goes below the min rate, which would be nonsensical.
val times = List.tabulate(50)(x => x * 20) // every 20ms
val elements = List.fill(50)(0) // no processing
val elements = List.fill(50)(1) // no processing
val proc = List.fill(50)(20) // 20ms of processing
val sched = List.fill(50)(100) // strictly positive accumulation
val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i))
res.head should equal(None)
res.tail should equal(List.fill(49)(Some(0D)))
res.tail should equal(List.fill(49)(Some(minRate)))
}
test("with no accumulated or positive error, |I| > 0, follow the processing speed") {
val p = new PIDRateEstimator(20, 1D, 1D, 0D)
val p = new PIDRateEstimator(20, 1D, 1D, 0D, 10)
// prepare a series of batch updates, one every 20ms with an increasing number of processed
// elements in each batch, but constant processing time, and no accumulated error. Even though
// the integral part is non-zero, the estimated rate should follow only the proportional term
val times = List.tabulate(50)(x => x * 20) // every 20ms
val elements = List.tabulate(50)(x => x * 20) // increasing
val elements = List.tabulate(50)(x => (x + 1) * 20) // increasing
val proc = List.fill(50)(20) // 20ms of processing
val sched = List.fill(50)(0)
val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i))
res.head should equal(None)
res.tail should equal(List.tabulate(50)(x => Some(x * 1000D)).tail)
res.tail should equal(List.tabulate(50)(x => Some((x + 1) * 1000D)).tail)
}
test("with no accumulated but some positive error, |I| > 0, follow the processing speed") {
val p = new PIDRateEstimator(20, 1D, 1D, 0D)
val p = new PIDRateEstimator(20, 1D, 1D, 0D, 10)
// prepare a series of batch updates, one every 20ms with an decreasing number of processed
// elements in each batch, but constant processing time, and no accumulated error. Even though
// the integral part is non-zero, the estimated rate should follow only the proportional term,
......@@ -116,13 +133,14 @@ class PIDRateEstimatorSuite extends SparkFunSuite with Matchers {
}
test("with some accumulated and some positive error, |I| > 0, stay below the processing speed") {
val p = new PIDRateEstimator(20, 1D, .01D, 0D)
val minRate = 10D
val p = new PIDRateEstimator(20, 1D, .01D, 0D, minRate)
val times = List.tabulate(50)(x => x * 20) // every 20ms
val rng = new Random()
val elements = List.tabulate(50)(x => rng.nextInt(1000))
val elements = List.tabulate(50)(x => rng.nextInt(1000) + 1000)
val procDelayMs = 20
val proc = List.fill(50)(procDelayMs) // 20ms of processing
val sched = List.tabulate(50)(x => rng.nextInt(19)) // random wait
val sched = List.tabulate(50)(x => rng.nextInt(19) + 1) // random wait
val speeds = elements map ((x) => x.toDouble / procDelayMs * 1000)
val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i))
......@@ -131,7 +149,12 @@ class PIDRateEstimatorSuite extends SparkFunSuite with Matchers {
res(n) should not be None
if (res(n).get > 0 && sched(n) > 0) {
res(n).get should be < speeds(n)
res(n).get should be >= minRate
}
}
}
private def createDefaultEstimator(): PIDRateEstimator = {
new PIDRateEstimator(20, 1D, 0D, 0D, 10)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment