diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 5931e923c2e36e2ca6c699e9e6017ee97760d4b4..10a7ccd5020008be9834700ffdc1ff4096241a5e 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -478,13 +478,21 @@ class ExternalSorter(object): os.makedirs(d) return os.path.join(d, str(n)) + def _next_limit(self): + """ + Return the next memory limit. If the memory is not released + after spilling, it will dump the data only when the used memory + starts to increase. + """ + return max(self.memory_limit, get_used_memory() * 1.05) + def sorted(self, iterator, key=None, reverse=False): """ Sort the elements in iterator, do external sort when the memory goes above the limit. """ global MemoryBytesSpilled, DiskBytesSpilled - batch = 100 + batch, limit = 100, self._next_limit() chunks, current_chunk = [], [] iterator = iter(iterator) while True: @@ -504,6 +512,7 @@ class ExternalSorter(object): chunks.append(self.serializer.load_stream(open(path))) current_chunk = [] gc.collect() + limit = self._next_limit() MemoryBytesSpilled += (used_memory - get_used_memory()) << 20 DiskBytesSpilled += os.path.getsize(path)