From 439e361010e51d2213c92ccabed5093be92a72ee Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= <francois@garillot.net>
Date: Tue, 3 May 2016 11:42:47 -0700
Subject: [PATCH] [SPARK-9819][STREAMING][DOCUMENTATION] Clarify doc for
 invReduceFunc in incremental versions of reduceByWindow
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

- that reduceFunc and invReduceFunc should be associative
- that the intermediate result in iterated applications of inverseReduceFunc
  is its first argument

Author: François Garillot <francois@garillot.net>

Closes #8103 from huitseeker/issue/invReduceFuncDoc.
---
 python/pyspark/streaming/dstream.py                           | 4 +++-
 .../org/apache/spark/streaming/api/java/JavaDStreamLike.scala | 3 ++-
 .../org/apache/spark/streaming/api/java/JavaPairDStream.scala | 3 ++-
 .../scala/org/apache/spark/streaming/dstream/DStream.scala    | 3 ++-
 .../apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
 5 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index 2056663872..67a0819601 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -454,7 +454,9 @@ class DStream(object):
         This is more efficient than `invReduceFunc` is None.
 
         @param reduceFunc:     associative and commutative reduce function
-        @param invReduceFunc:  inverse reduce function of `reduceFunc`
+        @param invReduceFunc:  inverse reduce function of `reduceFunc`; such that for all y,
+                               and invertible x:
+                               `invReduceFunc(reduceFunc(x, y), x) = y`
         @param windowDuration: width of the window; must be a multiple of this DStream's
                                batching interval
         @param slideDuration:  sliding interval of the window (i.e., the interval after which
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 43632f37cc..a0a40fcee2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -240,7 +240,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    *  This is more efficient than reduceByWindow without "inverse reduce" function.
    *  However, it is applicable to only "invertible reduce functions".
    * @param reduceFunc associative and commutative reduce function
-   * @param invReduceFunc inverse reduce function
+   * @param invReduceFunc inverse reduce function; such that for all y, invertible x:
+   *                      `invReduceFunc(reduceFunc(x, y), x) = y`
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 2a80cf4466..dec983165f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -336,7 +336,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * However, it is applicable to only "invertible reduce functions".
    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
    * @param reduceFunc associative and commutative reduce function
-   * @param invReduceFunc inverse function
+   * @param invReduceFunc inverse function; such that for all y, invertible x:
+   *                      `invReduceFunc(reduceFunc(x, y), x) = y`
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 583f5a48d1..01dcfcf24b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -793,7 +793,8 @@ abstract class DStream[T: ClassTag] (
    *  This is more efficient than reduceByWindow without "inverse reduce" function.
    *  However, it is applicable to only "invertible reduce functions".
    * @param reduceFunc associative and commutative reduce function
-   * @param invReduceFunc inverse reduce function
+   * @param invReduceFunc inverse reduce function; such that for all y, invertible x:
+   *                      `invReduceFunc(reduceFunc(x, y), x) = y`
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index b6394e36b5..2f2a6d13dd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -290,7 +290,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
    * However, it is applicable to only "invertible reduce functions".
    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
    * @param reduceFunc associative and commutative reduce function
-   * @param invReduceFunc inverse reduce function
+   * @param invReduceFunc inverse reduce function; such that for all y, invertible x:
+   *                      `invReduceFunc(reduceFunc(x, y), x) = y`
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    * @param slideDuration  sliding interval of the window (i.e., the interval after which
-- 
GitLab