Skip to content
Snippets Groups Projects
Commit 599a8c6e authored by David Tolpin's avatar David Tolpin Committed by Tathagata Das
Browse files

[SPARK-11812][PYSPARK] invFunc=None works properly with python's reduceByKeyAndWindow

invFunc is optional and can be None. Instead of invFunc (the parameter) invReduceFunc (a local function) was checked for trueness (that is, not None, in this context). A local function is never None,
thus the case of invFunc=None (a common one when inverse reduction is not defined) was treated incorrectly, resulting in loss of data.

In addition, the docstring used wrong parameter names, also fixed.

Author: David Tolpin <david.tolpin@gmail.com>

Closes #9775 from dtolpin/master.
parent 47000745
No related branches found
No related tags found
No related merge requests found
......@@ -524,8 +524,8 @@ class DStream(object):
`invFunc` can be None, then it will reduce all the RDDs in window, could be slower
than having `invFunc`.
@param reduceFunc: associative reduce function
@param invReduceFunc: inverse function of `reduceFunc`
@param func: associative reduce function
@param invFunc: inverse function of `reduceFunc`
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
......@@ -556,7 +556,7 @@ class DStream(object):
if kv[1] is not None else kv[0])
jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer)
if invReduceFunc:
if invFunc:
jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer)
else:
jinvReduceFunc = None
......
......@@ -582,6 +582,17 @@ class WindowFunctionTests(PySparkStreamingTestCase):
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1))
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 1, 0.1))
def test_reduce_by_key_and_window_with_none_invFunc(self):
input = [range(1), range(2), range(3), range(4), range(5), range(6)]
def func(dstream):
return dstream.map(lambda x: (x, 1))\
.reduceByKeyAndWindow(operator.add, None, 5, 1)\
.filter(lambda kv: kv[1] > 0).count()
expected = [[2], [4], [6], [6], [6], [6]]
self._test_func(input, func, expected)
class StreamingContextTests(PySparkStreamingTestCase):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment