From 437583f692e30b8dc03b339a34e92595d7b992ba Mon Sep 17 00:00:00 2001 From: David Tolpin <david.tolpin@gmail.com> Date: Wed, 16 Dec 2015 22:10:24 -0800 Subject: [PATCH] [SPARK-11904][PYSPARK] reduceByKeyAndWindow does not require checkpointing when invFunc is None when invFunc is None, `reduceByKeyAndWindow(func, None, winsize, slidesize)` is equivalent to reduceByKey(func).window(winsize, slidesize).reduceByKey(winsize, slidesize) and no checkpoint is necessary. The corresponding Scala code does exactly that, but Python code always creates a windowed stream with obligatory checkpointing. The patch fixes this. I do not know how to unit-test this. Author: David Tolpin <david.tolpin@gmail.com> Closes #9888 from dtolpin/master. --- python/pyspark/streaming/dstream.py | 45 +++++++++++++++-------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index f61137cb88..b994a53bf2 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -542,31 +542,32 @@ class DStream(object): reduced = self.reduceByKey(func, numPartitions) - def reduceFunc(t, a, b): - b = b.reduceByKey(func, numPartitions) - r = a.union(b).reduceByKey(func, numPartitions) if a else b - if filterFunc: - r = r.filter(filterFunc) - return r - - def invReduceFunc(t, a, b): - b = b.reduceByKey(func, numPartitions) - joined = a.leftOuterJoin(b, numPartitions) - return joined.mapValues(lambda kv: invFunc(kv[0], kv[1]) - if kv[1] is not None else kv[0]) - - jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) if invFunc: + def reduceFunc(t, a, b): + b = b.reduceByKey(func, numPartitions) + r = a.union(b).reduceByKey(func, numPartitions) if a else b + if filterFunc: + r = r.filter(filterFunc) + return r + + def invReduceFunc(t, a, b): + b = b.reduceByKey(func, numPartitions) + joined = a.leftOuterJoin(b, numPartitions) + return joined.mapValues(lambda kv: invFunc(kv[0], kv[1]) + if kv[1] is not None else kv[0]) + + jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer) + if slideDuration is None: + slideDuration = self._slideDuration + dstream = self._sc._jvm.PythonReducedWindowedDStream( + reduced._jdstream.dstream(), + jreduceFunc, jinvReduceFunc, + self._ssc._jduration(windowDuration), + self._ssc._jduration(slideDuration)) + return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) else: - jinvReduceFunc = None - if slideDuration is None: - slideDuration = self._slideDuration - dstream = self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(), - jreduceFunc, jinvReduceFunc, - self._ssc._jduration(windowDuration), - self._ssc._jduration(slideDuration)) - return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) + return reduced.window(windowDuration, slideDuration).reduceByKey(func, numPartitions) def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None): """ -- GitLab