Skip to content
Snippets Groups Projects
  • Davies Liu's avatar
    2fc8aca0
    [SPARK-1065] [PySpark] improve supporting for large broadcast · 2fc8aca0
    Davies Liu authored
    Passing large object by py4j is very slow (cost much memory), so pass broadcast objects via files (similar to parallelize()).
    
    Add an option to keep object in driver (it's False by default) to save memory in driver.
    
    Author: Davies Liu <davies.liu@gmail.com>
    
    Closes #1912 from davies/broadcast and squashes the following commits:
    
    e06df4a [Davies Liu] load broadcast from disk in driver automatically
    db3f232 [Davies Liu] fix serialization of accumulator
    631a827 [Davies Liu] Merge branch 'master' into broadcast
    c7baa8c [Davies Liu] compress serrialized broadcast and command
    9a7161f [Davies Liu] fix doc tests
    e93cf4b [Davies Liu] address comments: add test
    6226189 [Davies Liu] improve large broadcast
    2fc8aca0
    History
    [SPARK-1065] [PySpark] improve supporting for large broadcast
    Davies Liu authored
    Passing large object by py4j is very slow (cost much memory), so pass broadcast objects via files (similar to parallelize()).
    
    Add an option to keep object in driver (it's False by default) to save memory in driver.
    
    Author: Davies Liu <davies.liu@gmail.com>
    
    Closes #1912 from davies/broadcast and squashes the following commits:
    
    e06df4a [Davies Liu] load broadcast from disk in driver automatically
    db3f232 [Davies Liu] fix serialization of accumulator
    631a827 [Davies Liu] Merge branch 'master' into broadcast
    c7baa8c [Davies Liu] compress serrialized broadcast and command
    9a7161f [Davies Liu] fix doc tests
    e93cf4b [Davies Liu] address comments: add test
    6226189 [Davies Liu] improve large broadcast
broadcast.py 2.69 KiB
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
>>> b = sc.broadcast([1, 2, 3, 4, 5])
>>> b.value
[1, 2, 3, 4, 5]
>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
>>> b.unpersist()

>>> large_broadcast = sc.broadcast(list(range(10000)))
"""
import os

from pyspark.serializers import CompressedSerializer, PickleSerializer

# Holds broadcasted data received from Java, keyed by its id.
_broadcastRegistry = {}


def _from_id(bid):
    from pyspark.broadcast import _broadcastRegistry
    if bid not in _broadcastRegistry:
        raise Exception("Broadcast variable '%s' not loaded!" % bid)
    return _broadcastRegistry[bid]


class Broadcast(object):

    """
    A broadcast variable created with
    L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}.
    Access its value through C{.value}.
    """

    def __init__(self, bid, value, java_broadcast=None,
                 pickle_registry=None, path=None):
        """
        Should not be called directly by users -- use
        L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}
        instead.
        """
        self.bid = bid
        if path is None:
            self.value = value
        self._jbroadcast = java_broadcast
        self._pickle_registry = pickle_registry
        self.path = path

    def unpersist(self, blocking=False):
        self._jbroadcast.unpersist(blocking)
        os.unlink(self.path)

    def __reduce__(self):
        self._pickle_registry.add(self)
        return (_from_id, (self.bid, ))

    def __getattr__(self, item):
        if item == 'value' and self.path is not None:
            ser = CompressedSerializer(PickleSerializer())
            value = ser.load_stream(open(self.path)).next()
            self.value = value
            return value

        raise AttributeError(item)


if __name__ == "__main__":
    import doctest
    doctest.testmod()