Skip to content
Snippets Groups Projects
Commit e0f7fb7f authored by jerryshao's avatar jerryshao Committed by Tathagata Das
Browse files

[SPARK-5315][Streaming] Fix reduceByWindow Java API not work bug

`reduceByWindow` for Java API is actually not Java compatible, change to make it Java compatible.

Current solution is to deprecate the old one and add a new API, but since old API actually is not correct, so is keeping the old one meaningful? just to keep the binary compatible? Also even adding new API still need to add to Mima exclusion, I'm not sure to change the API, or deprecate the old API and add a new one, which is the best solution?

Author: jerryshao <saisai.shao@intel.com>

Closes #4104 from jerryshao/SPARK-5315 and squashes the following commits:

5bc8987 [jerryshao] Address the comment
c7aa1b4 [jerryshao] Deprecate the old one to keep binary compatible
8e9dc67 [jerryshao] Fix JavaDStream reduceByWindow signature error
parent 3c3fa632
No related branches found
No related tags found
No related merge requests found
...@@ -90,6 +90,10 @@ object MimaExcludes { ...@@ -90,6 +90,10 @@ object MimaExcludes {
// SPARK-5297 Java FileStream do not work with custom key/values // SPARK-5297 Java FileStream do not work with custom key/values
ProblemFilters.exclude[MissingMethodProblem]( ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream") "org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream")
) ++ Seq(
// SPARK-5315 Spark Streaming Java API returns Scala DStream
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow")
) )
case v if v.startsWith("1.2") => case v if v.startsWith("1.2") =>
......
...@@ -211,7 +211,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T ...@@ -211,7 +211,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* @param slideDuration sliding interval of the window (i.e., the interval after which * @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this * the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval * DStream's batching interval
* @deprecated As this API is not Java compatible.
*/ */
@deprecated("Use Java-compatible version of reduceByWindow", "1.3.0")
def reduceByWindow( def reduceByWindow(
reduceFunc: (T, T) => T, reduceFunc: (T, T) => T,
windowDuration: Duration, windowDuration: Duration,
...@@ -220,6 +222,24 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T ...@@ -220,6 +222,24 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration) dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
} }
/**
* Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def reduceByWindow(
reduceFunc: JFunction2[T, T, T],
windowDuration: Duration,
slideDuration: Duration
): JavaDStream[T] = {
dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
}
/** /**
* Return a new DStream in which each RDD has a single element generated by reducing all * Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream. However, the reduction is done incrementally * elements in a sliding window over this DStream. However, the reduction is done incrementally
......
...@@ -306,7 +306,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ...@@ -306,7 +306,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void testReduceByWindow() { public void testReduceByWindowWithInverse() {
testReduceByWindow(true);
}
@SuppressWarnings("unchecked")
@Test
public void testReduceByWindowWithoutInverse() {
testReduceByWindow(false);
}
private void testReduceByWindow(boolean withInverse) {
List<List<Integer>> inputData = Arrays.asList( List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,2,3), Arrays.asList(1,2,3),
Arrays.asList(4,5,6), Arrays.asList(4,5,6),
...@@ -319,8 +329,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ...@@ -319,8 +329,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(24)); Arrays.asList(24));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(), JavaDStream<Integer> reducedWindowed = null;
if (withInverse) {
reducedWindowed = stream.reduceByWindow(new IntegerSum(),
new IntegerDifference(), new Duration(2000), new Duration(1000)); new IntegerDifference(), new Duration(2000), new Duration(1000));
} else {
reducedWindowed = stream.reduceByWindow(new IntegerSum(),
new Duration(2000), new Duration(1000));
}
JavaTestUtils.attachTestOutputStream(reducedWindowed); JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment