Skip to content
Snippets Groups Projects
Commit d70a0768 authored by Wesley Miao's avatar Wesley Miao Committed by Sean Owen
Browse files

[SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time

tdas

https://issues.apache.org/jira/browse/SPARK-7326

The problem most likely resides in DStream.slice() implementation, as shown below.

  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
    if (!isInitialized) {
      throw new SparkException(this + " has not been initialized")
    }
    if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
      logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
        + slideDuration + ")")
    }
    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

    logInfo("Slicing from " + fromTime + " to " + toTime +
      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
      if (time >= zeroTime) getOrCompute(time) else None
    })
  }

Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.

The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :

  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }

And then change the DStream.slice to call this new floor function by passing in its zeroTime.

    val alignedToTime = toTime.floor(slideDuration, zeroTime)
    val alignedFromTime = fromTime.floor(slideDuration, zeroTime)

This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0.

Author: Wesley Miao <wesley.miao@gmail.com>
Author: Wesley <wesley.miao@autodesk.com>

Closes #5871 from wesleymiao/spark-7326 and squashes the following commits:

82a4d8c [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream dosen't work all the time
48b4dc0 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
6ade399 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
2611745 [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time
parent 2242ab31
No related branches found
No related tags found
No related merge requests found
...@@ -63,6 +63,11 @@ case class Time(private val millis: Long) { ...@@ -63,6 +63,11 @@ case class Time(private val millis: Long) {
new Time((this.millis / t) * t) new Time((this.millis / t) * t)
} }
def floor(that: Duration, zeroTime: Time): Time = {
val t = that.milliseconds
new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
}
def isMultipleOf(that: Duration): Boolean = def isMultipleOf(that: Duration): Boolean =
(this.millis % that.milliseconds == 0) (this.millis % that.milliseconds == 0)
......
...@@ -763,16 +763,22 @@ abstract class DStream[T: ClassTag] ( ...@@ -763,16 +763,22 @@ abstract class DStream[T: ClassTag] (
if (!isInitialized) { if (!isInitialized) {
throw new SparkException(this + " has not been initialized") throw new SparkException(this + " has not been initialized")
} }
if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) {
+ slideDuration + ")") toTime
} else {
logWarning("toTime (" + toTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
toTime.floor(slideDuration, zeroTime)
} }
if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" val alignedFromTime = if ((fromTime - zeroTime).isMultipleOf(slideDuration)) {
+ slideDuration + ")") fromTime
} else {
logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
+ slideDuration + ")")
fromTime.floor(slideDuration, zeroTime)
} }
val alignedToTime = toTime.floor(slideDuration)
val alignedFromTime = fromTime.floor(slideDuration)
logInfo("Slicing from " + fromTime + " to " + toTime + logInfo("Slicing from " + fromTime + " to " + toTime +
" (aligned to " + alignedFromTime + " and " + alignedToTime + ")") " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
......
...@@ -69,6 +69,9 @@ class TimeSuite extends TestSuiteBase { ...@@ -69,6 +69,9 @@ class TimeSuite extends TestSuiteBase {
assert(new Time(1200).floor(new Duration(200)) == new Time(1200)) assert(new Time(1200).floor(new Duration(200)) == new Time(1200))
assert(new Time(199).floor(new Duration(200)) == new Time(0)) assert(new Time(199).floor(new Duration(200)) == new Time(0))
assert(new Time(1).floor(new Duration(1)) == new Time(1)) assert(new Time(1).floor(new Duration(1)) == new Time(1))
assert(new Time(1350).floor(new Duration(200), new Time(50)) == new Time(1250))
assert(new Time(1350).floor(new Duration(200), new Time(150)) == new Time(1350))
assert(new Time(1350).floor(new Duration(200), new Time(200)) == new Time(1200))
} }
test("isMultipleOf") { test("isMultipleOf") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment