Skip to content
Snippets Groups Projects
Commit 12b020b6 authored by Tathagata Das's avatar Tathagata Das
Browse files

Added filter functionality to reduceByKeyAndWindow with inverse. Consolidated...

Added filter functionality to reduceByKeyAndWindow with inverse. Consolidated reduceByKeyAndWindow's many functions into smaller number of functions with optional parameters.
parent 39addd38
No related branches found
No related tags found
No related merge requests found
...@@ -531,7 +531,7 @@ abstract class DStream[T: ClassManifest] ( ...@@ -531,7 +531,7 @@ abstract class DStream[T: ClassManifest] (
windowDuration: Duration, windowDuration: Duration,
slideDuration: Duration slideDuration: Duration
): DStream[T] = { ): DStream[T] = {
this.window(windowDuration, slideDuration).reduce(reduceFunc) this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
} }
def reduceByWindow( def reduceByWindow(
......
...@@ -137,7 +137,8 @@ extends Serializable { ...@@ -137,7 +137,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this * the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval * DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream. * @param numPartitions number of partitions of each RDD in the new DStream; if not specified
* then Spark's default number of partitions will be used
*/ */
def groupByKeyAndWindow( def groupByKeyAndWindow(
windowDuration: Duration, windowDuration: Duration,
...@@ -155,7 +156,7 @@ extends Serializable { ...@@ -155,7 +156,7 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this * the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval * DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
*/ */
def groupByKeyAndWindow( def groupByKeyAndWindow(
windowDuration: Duration, windowDuration: Duration,
...@@ -213,7 +214,7 @@ extends Serializable { ...@@ -213,7 +214,7 @@ extends Serializable {
* @param numPartitions Number of partitions of each RDD in the new DStream. * @param numPartitions Number of partitions of each RDD in the new DStream.
*/ */
def reduceByKeyAndWindow( def reduceByKeyAndWindow(
reduceFunc: (V, V) => V, reduceFunc: (V, V) => V,
windowDuration: Duration, windowDuration: Duration,
slideDuration: Duration, slideDuration: Duration,
numPartitions: Int numPartitions: Int
...@@ -230,7 +231,8 @@ extends Serializable { ...@@ -230,7 +231,8 @@ extends Serializable {
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this * the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval * DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. * @param partitioner partitioner for controlling the partitioning of each RDD
* in the new DStream.
*/ */
def reduceByKeyAndWindow( def reduceByKeyAndWindow(
reduceFunc: (V, V) => V, reduceFunc: (V, V) => V,
...@@ -245,7 +247,7 @@ extends Serializable { ...@@ -245,7 +247,7 @@ extends Serializable {
} }
/** /**
* Create a new DStream by reducing over a using incremental computation. * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value : * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts) * 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
...@@ -253,81 +255,64 @@ extends Serializable { ...@@ -253,81 +255,64 @@ extends Serializable {
* However, it is applicable to only "invertible reduce functions". * However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function * @param reduceFunc associative reduce function
* @param invReduceFunc inverse function * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)] = {
reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
/**
* Create a new DStream by reducing over a using incremental computation.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
* @param invReduceFunc inverse function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this * the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval * DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream. * @param filterFunc Optional function to filter expired key-value pairs;
* only pairs that satisfy the function are retained
*/ */
def reduceByKeyAndWindow( def reduceByKeyAndWindow(
reduceFunc: (V, V) => V, reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V, invReduceFunc: (V, V) => V,
windowDuration: Duration, windowDuration: Duration,
slideDuration: Duration, slideDuration: Duration = self.slideDuration,
numPartitions: Int numPartitions: Int = ssc.sc.defaultParallelism,
filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)] = { ): DStream[(K, V)] = {
reduceByKeyAndWindow( reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) reduceFunc, invReduceFunc, windowDuration,
slideDuration, defaultPartitioner(numPartitions), filterFunc
)
} }
/** /**
* Create a new DStream by reducing over a using incremental computation. * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value : * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts) * 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions". * However, it is applicable to only "invertible reduce functions".
* @param reduceFunc associative reduce function * @param reduceFunc associative reduce function
* @param invReduceFunc inverse function * @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's * @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this * the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval * DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream.
* @param filterFunc Optional function to filter expired key-value pairs;
* only pairs that satisfy the function are retained
*/ */
def reduceByKeyAndWindow( def reduceByKeyAndWindow(
reduceFunc: (V, V) => V, reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V, invReduceFunc: (V, V) => V,
windowDuration: Duration, windowDuration: Duration,
slideDuration: Duration, slideDuration: Duration,
partitioner: Partitioner partitioner: Partitioner,
filterFunc: ((K, V)) => Boolean
): DStream[(K, V)] = { ): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc) val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
new ReducedWindowedDStream[K, V]( new ReducedWindowedDStream[K, V](
self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner) self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
windowDuration, slideDuration, partitioner
)
} }
/** /**
......
...@@ -328,7 +328,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( ...@@ -328,7 +328,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
} }
/** /**
* Create a new DStream by reducing over a using incremental computation. * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value : * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts) * 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
...@@ -342,25 +342,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( ...@@ -342,25 +342,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this * the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval * DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream. * @param numPartitions number of partitions of each RDD in the new DStream.
* @param filterFunc function to filter expired key-value pairs;
* only pairs that satisfy the function are retained
* set this to null if you do not want to filter
*/ */
def reduceByKeyAndWindow( def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V], reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
windowDuration: Duration, windowDuration: Duration,
slideDuration: Duration, slideDuration: Duration,
numPartitions: Int numPartitions: Int,
filterFunc: JFunction[(K, V), java.lang.Boolean]
): JavaPairDStream[K, V] = { ): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow( dstream.reduceByKeyAndWindow(
reduceFunc, reduceFunc,
invReduceFunc, invReduceFunc,
windowDuration, windowDuration,
slideDuration, slideDuration,
numPartitions) numPartitions,
(p: (K, V)) => filterFunc(p).booleanValue()
)
} }
/** /**
* Create a new DStream by reducing over a using incremental computation. * Create a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduce value : * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts) * 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
...@@ -374,20 +380,26 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( ...@@ -374,20 +380,26 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* the new DStream will generate RDDs); must be a multiple of this * the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval * DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
* @param filterFunc function to filter expired key-value pairs;
* only pairs that satisfy the function are retained
* set this to null if you do not want to filter
*/ */
def reduceByKeyAndWindow( def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V], reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
windowDuration: Duration, windowDuration: Duration,
slideDuration: Duration, slideDuration: Duration,
partitioner: Partitioner partitioner: Partitioner,
): JavaPairDStream[K, V] = { filterFunc: JFunction[(K, V), java.lang.Boolean]
): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow( dstream.reduceByKeyAndWindow(
reduceFunc, reduceFunc,
invReduceFunc, invReduceFunc,
windowDuration, windowDuration,
slideDuration, slideDuration,
partitioner) partitioner,
(p: (K, V)) => filterFunc(p).booleanValue()
)
} }
/** /**
......
...@@ -3,7 +3,7 @@ package spark.streaming.dstream ...@@ -3,7 +3,7 @@ package spark.streaming.dstream
import spark.streaming.StreamingContext._ import spark.streaming.StreamingContext._
import spark.RDD import spark.RDD
import spark.rdd.CoGroupedRDD import spark.rdd.{CoGroupedRDD, MapPartitionsRDD}
import spark.Partitioner import spark.Partitioner
import spark.SparkContext._ import spark.SparkContext._
import spark.storage.StorageLevel import spark.storage.StorageLevel
...@@ -15,7 +15,8 @@ private[streaming] ...@@ -15,7 +15,8 @@ private[streaming]
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)], parent: DStream[(K, V)],
reduceFunc: (V, V) => V, reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V, invReduceFunc: (V, V) => V,
filterFunc: Option[((K, V)) => Boolean],
_windowDuration: Duration, _windowDuration: Duration,
_slideDuration: Duration, _slideDuration: Duration,
partitioner: Partitioner partitioner: Partitioner
...@@ -87,22 +88,25 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( ...@@ -87,22 +88,25 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
// //
// Get the RDDs of the reduced values in "old time steps" // Get the RDDs of the reduced values in "old time steps"
val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) val oldRDDs =
reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
logDebug("# old RDDs = " + oldRDDs.size) logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps" // Get the RDDs of the reduced values in "new time steps"
val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime) val newRDDs =
reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
logDebug("# new RDDs = " + newRDDs.size) logDebug("# new RDDs = " + newRDDs.size)
// Get the RDD of the reduced value of the previous window // Get the RDD of the reduced value of the previous window
val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]())) val previousWindowRDD =
getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
// Make the list of RDDs that needs to cogrouped together for reducing their reduced values // Make the list of RDDs that needs to cogrouped together for reducing their reduced values
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
// Cogroup the reduced RDDs and merge the reduced values // Cogroup the reduced RDDs and merge the reduced values
val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner) val cogroupedRDD =
//val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
val numOldValues = oldRDDs.size val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size val numNewValues = newRDDs.size
...@@ -114,7 +118,9 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( ...@@ -114,7 +118,9 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
// Getting reduced values "old time steps" that will be removed from current window // Getting reduced values "old time steps" that will be removed from current window
val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head) val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
// Getting reduced values "new time steps" // Getting reduced values "new time steps"
val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head) val newValues =
(1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
if (seqOfValues(0).isEmpty) { if (seqOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist // If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) { if (newValues.isEmpty) {
...@@ -140,10 +146,12 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( ...@@ -140,10 +146,12 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues) val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
Some(mergedValuesRDD) if (filterFunc.isDefined) {
Some(mergedValuesRDD.filter(filterFunc.get))
} else {
Some(mergedValuesRDD)
}
} }
} }
...@@ -291,7 +291,6 @@ class TestOutputStream[T: ClassManifest]( ...@@ -291,7 +291,6 @@ class TestOutputStream[T: ClassManifest](
(rdd: RDD[T], t: Time) => { (rdd: RDD[T], t: Time) => {
val collected = rdd.collect() val collected = rdd.collect()
output += collected output += collected
println(t + ": " + collected.mkString("[", ",", "]"))
} }
) { ) {
......
# Set everything to be logged to the file streaming/target/unit-tests.log # Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=WARN, file log4j.rootCategory=WARN, file
# log4j.appender.file=org.apache.log4j.FileAppender # log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.ConsoleAppender log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout=org.apache.log4j.PatternLayout
......
...@@ -84,12 +84,9 @@ class WindowOperationsSuite extends TestSuiteBase { ...@@ -84,12 +84,9 @@ class WindowOperationsSuite extends TestSuiteBase {
) )
/* /*
The output of the reduceByKeyAndWindow with inverse reduce function is The output of the reduceByKeyAndWindow with inverse function but without a filter
different from the naive reduceByKeyAndWindow. Even if the count of a function will be different from the naive reduceByKeyAndWindow, as no keys get
particular key is 0, the key does not get eliminated from the RDDs of eliminated from the ReducedWindowedDStream even if the value of a key becomes 0.
ReducedWindowedDStream. This causes the number of keys in these RDDs to
increase forever. A more generalized version that allows elimination of
keys should be considered.
*/ */
val bigReduceInvOutput = Seq( val bigReduceInvOutput = Seq(
...@@ -177,31 +174,31 @@ class WindowOperationsSuite extends TestSuiteBase { ...@@ -177,31 +174,31 @@ class WindowOperationsSuite extends TestSuiteBase {
// Testing reduceByKeyAndWindow (with invertible reduce function) // Testing reduceByKeyAndWindow (with invertible reduce function)
testReduceByKeyAndWindowInv( testReduceByKeyAndWindowWithInverse(
"basic reduction", "basic reduction",
Seq(Seq(("a", 1), ("a", 3)) ), Seq(Seq(("a", 1), ("a", 3)) ),
Seq(Seq(("a", 4)) ) Seq(Seq(("a", 4)) )
) )
testReduceByKeyAndWindowInv( testReduceByKeyAndWindowWithInverse(
"key already in window and new value added into window", "key already in window and new value added into window",
Seq( Seq(("a", 1)), Seq(("a", 1)) ), Seq( Seq(("a", 1)), Seq(("a", 1)) ),
Seq( Seq(("a", 1)), Seq(("a", 2)) ) Seq( Seq(("a", 1)), Seq(("a", 2)) )
) )
testReduceByKeyAndWindowInv( testReduceByKeyAndWindowWithInverse(
"new key added into window", "new key added into window",
Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ), Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) ) Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
) )
testReduceByKeyAndWindowInv( testReduceByKeyAndWindowWithInverse(
"key removed from window", "key removed from window",
Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ), Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) ) Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) )
) )
testReduceByKeyAndWindowInv( testReduceByKeyAndWindowWithInverse(
"larger slide time", "larger slide time",
largerSlideInput, largerSlideInput,
largerSlideReduceOutput, largerSlideReduceOutput,
...@@ -209,7 +206,9 @@ class WindowOperationsSuite extends TestSuiteBase { ...@@ -209,7 +206,9 @@ class WindowOperationsSuite extends TestSuiteBase {
Seconds(2) Seconds(2)
) )
testReduceByKeyAndWindowInv("big test", bigInput, bigReduceInvOutput) testReduceByKeyAndWindowWithInverse("big test", bigInput, bigReduceInvOutput)
testReduceByKeyAndWindowWithFilteredInverse("big test", bigInput, bigReduceOutput)
test("groupByKeyAndWindow") { test("groupByKeyAndWindow") {
val input = bigInput val input = bigInput
...@@ -276,27 +275,45 @@ class WindowOperationsSuite extends TestSuiteBase { ...@@ -276,27 +275,45 @@ class WindowOperationsSuite extends TestSuiteBase {
test("reduceByKeyAndWindow - " + name) { test("reduceByKeyAndWindow - " + name) {
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => { val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration).persist() s.reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowDuration, slideDuration)
} }
testOperation(input, operation, expectedOutput, numBatches, true) testOperation(input, operation, expectedOutput, numBatches, true)
} }
} }
def testReduceByKeyAndWindowInv( def testReduceByKeyAndWindowWithInverse(
name: String, name: String,
input: Seq[Seq[(String, Int)]], input: Seq[Seq[(String, Int)]],
expectedOutput: Seq[Seq[(String, Int)]], expectedOutput: Seq[Seq[(String, Int)]],
windowDuration: Duration = Seconds(2), windowDuration: Duration = Seconds(2),
slideDuration: Duration = Seconds(1) slideDuration: Duration = Seconds(1)
) { ) {
test("reduceByKeyAndWindowInv - " + name) { test("ReduceByKeyAndWindow with inverse function - " + name) {
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => { val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration) s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration)
.persist()
.checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
} }
testOperation(input, operation, expectedOutput, numBatches, true) testOperation(input, operation, expectedOutput, numBatches, true)
} }
} }
def testReduceByKeyAndWindowWithFilteredInverse(
name: String,
input: Seq[Seq[(String, Int)]],
expectedOutput: Seq[Seq[(String, Int)]],
windowDuration: Duration = Seconds(2),
slideDuration: Duration = Seconds(1)
) {
test("reduceByKeyAndWindow with inverse and filter functions - " + name) {
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val filterFunc = (p: (String, Int)) => p._2 != 0
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration, filterFunc = filterFunc)
.persist()
.checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment