Skip to content
Snippets Groups Projects
  • Davies Liu's avatar
    c5414b68
    [SPARK-3478] [PySpark] Profile the Python tasks · c5414b68
    Davies Liu authored
    This patch add profiling support for PySpark, it will show the profiling results
    before the driver exits, here is one example:
    
    ```
    ============================================================
    Profile of RDD<id=3>
    ============================================================
             5146507 function calls (5146487 primitive calls) in 71.094 seconds
    
       Ordered by: internal time, cumulative time
    
       ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      5144576   68.331    0.000   68.331    0.000 statcounter.py:44(merge)
           20    2.735    0.137   71.071    3.554 statcounter.py:33(__init__)
           20    0.017    0.001    0.017    0.001 {cPickle.dumps}
         1024    0.003    0.000    0.003    0.000 t.py:16(<lambda>)
           20    0.001    0.000    0.001    0.000 {reduce}
           21    0.001    0.000    0.001    0.000 {cPickle.loads}
           20    0.001    0.000    0.001    0.000 copy_reg.py:95(_slotnames)
           41    0.001    0.000    0.001    0.000 serializers.py:461(read_int)
           40    0.001    0.000    0.002    0.000 serializers.py:179(_batched)
           62    0.000    0.000    0.000    0.000 {method 'read' of 'file' objects}
           20    0.000    0.000   71.072    3.554 rdd.py:863(<lambda>)
           20    0.000    0.000    0.001    0.000 serializers.py:198(load_stream)
        40/20    0.000    0.000   71.072    3.554 rdd.py:2093(pipeline_func)
           41    0.000    0.000    0.002    0.000 serializers.py:130(load_stream)
           40    0.000    0.000   71.072    1.777 rdd.py:304(func)
           20    0.000    0.000   71.094    3.555 worker.py:82(process)
    ```
    
    Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk
    by `sc.dump_profiles(path)`, such as
    
    ```python
    >>> sc._conf.set("spark.python.profile", "true")
    >>> rdd = sc.parallelize(range(100)).map(str)
    >>> rdd.count()
    100
    >>> sc.show_profiles()
    ============================================================
    Profile of RDD<id=1>
    ============================================================
             284 function calls (276 primitive calls) in 0.001 seconds
    
       Ordered by: internal time, cumulative time
    
       ncalls  tottime  percall  cumtime  percall filename:lineno(function)
            4    0.000    0.000    0.000    0.000 serializers.py:198(load_stream)
            4    0.000    0.000    0.000    0.000 {reduce}
         12/4    0.000    0.000    0.001    0.000 rdd.py:2092(pipeline_func)
            4    0.000    0.000    0.000    0.000 {cPickle.loads}
            4    0.000    0.000    0.000    0.000 {cPickle.dumps}
          104    0.000    0.000    0.000    0.000 rdd.py:852(<genexpr>)
            8    0.000    0.000    0.000    0.000 serializers.py:461(read_int)
           12    0.000    0.000    0.000    0.000 rdd.py:303(func)
    ```
    The profiling is disabled by default, can be enabled by "spark.python.profile=true".
    
    Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"
    
    This is bugfix of #2351 cc JoshRosen
    
    Author: Davies Liu <davies.liu@gmail.com>
    
    Closes #2556 from davies/profiler and squashes the following commits:
    
    e68df5a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
    858e74c [Davies Liu] compatitable with python 2.6
    7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles()
    2b0daf2 [Davies Liu] fix docs
    7a56c24 [Davies Liu] bugfix
    cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext
    fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
    116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
    09d02c3 [Davies Liu] Merge branch 'master' into profiler
    c23865c [Davies Liu] Merge branch 'master' into profiler
    15d6f18 [Davies Liu] add docs for two configs
    dadee1a [Davies Liu] add docs string and clear profiles after show or dump
    4f8309d [Davies Liu] address comment, add tests
    0a5b6eb [Davies Liu] fix Python UDF
    4b20494 [Davies Liu] add profile for python
    c5414b68
    History
    [SPARK-3478] [PySpark] Profile the Python tasks
    Davies Liu authored
    This patch add profiling support for PySpark, it will show the profiling results
    before the driver exits, here is one example:
    
    ```
    ============================================================
    Profile of RDD<id=3>
    ============================================================
             5146507 function calls (5146487 primitive calls) in 71.094 seconds
    
       Ordered by: internal time, cumulative time
    
       ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      5144576   68.331    0.000   68.331    0.000 statcounter.py:44(merge)
           20    2.735    0.137   71.071    3.554 statcounter.py:33(__init__)
           20    0.017    0.001    0.017    0.001 {cPickle.dumps}
         1024    0.003    0.000    0.003    0.000 t.py:16(<lambda>)
           20    0.001    0.000    0.001    0.000 {reduce}
           21    0.001    0.000    0.001    0.000 {cPickle.loads}
           20    0.001    0.000    0.001    0.000 copy_reg.py:95(_slotnames)
           41    0.001    0.000    0.001    0.000 serializers.py:461(read_int)
           40    0.001    0.000    0.002    0.000 serializers.py:179(_batched)
           62    0.000    0.000    0.000    0.000 {method 'read' of 'file' objects}
           20    0.000    0.000   71.072    3.554 rdd.py:863(<lambda>)
           20    0.000    0.000    0.001    0.000 serializers.py:198(load_stream)
        40/20    0.000    0.000   71.072    3.554 rdd.py:2093(pipeline_func)
           41    0.000    0.000    0.002    0.000 serializers.py:130(load_stream)
           40    0.000    0.000   71.072    1.777 rdd.py:304(func)
           20    0.000    0.000   71.094    3.555 worker.py:82(process)
    ```
    
    Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk
    by `sc.dump_profiles(path)`, such as
    
    ```python
    >>> sc._conf.set("spark.python.profile", "true")
    >>> rdd = sc.parallelize(range(100)).map(str)
    >>> rdd.count()
    100
    >>> sc.show_profiles()
    ============================================================
    Profile of RDD<id=1>
    ============================================================
             284 function calls (276 primitive calls) in 0.001 seconds
    
       Ordered by: internal time, cumulative time
    
       ncalls  tottime  percall  cumtime  percall filename:lineno(function)
            4    0.000    0.000    0.000    0.000 serializers.py:198(load_stream)
            4    0.000    0.000    0.000    0.000 {reduce}
         12/4    0.000    0.000    0.001    0.000 rdd.py:2092(pipeline_func)
            4    0.000    0.000    0.000    0.000 {cPickle.loads}
            4    0.000    0.000    0.000    0.000 {cPickle.dumps}
          104    0.000    0.000    0.000    0.000 rdd.py:852(<genexpr>)
            8    0.000    0.000    0.000    0.000 serializers.py:461(read_int)
           12    0.000    0.000    0.000    0.000 rdd.py:303(func)
    ```
    The profiling is disabled by default, can be enabled by "spark.python.profile=true".
    
    Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"
    
    This is bugfix of #2351 cc JoshRosen
    
    Author: Davies Liu <davies.liu@gmail.com>
    
    Closes #2556 from davies/profiler and squashes the following commits:
    
    e68df5a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
    858e74c [Davies Liu] compatitable with python 2.6
    7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles()
    2b0daf2 [Davies Liu] fix docs
    7a56c24 [Davies Liu] bugfix
    cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext
    fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
    116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
    09d02c3 [Davies Liu] Merge branch 'master' into profiler
    c23865c [Davies Liu] Merge branch 'master' into profiler
    15d6f18 [Davies Liu] add docs for two configs
    dadee1a [Davies Liu] add docs string and clear profiles after show or dump
    4f8309d [Davies Liu] address comment, add tests
    0a5b6eb [Davies Liu] fix Python UDF
    4b20494 [Davies Liu] add profile for python
accumulators.py 8.01 KiB
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
>>> from pyspark.context import SparkContext
>>> sc = SparkContext('local', 'test')
>>> a = sc.accumulator(1)
>>> a.value
1
>>> a.value = 2
>>> a.value
2
>>> a += 5
>>> a.value
7

>>> sc.accumulator(1.0).value
1.0

>>> sc.accumulator(1j).value
1j

>>> rdd = sc.parallelize([1,2,3])
>>> def f(x):
...     global a
...     a += x
>>> rdd.foreach(f)
>>> a.value
13

>>> b = sc.accumulator(0)
>>> def g(x):
...     b.add(x)
>>> rdd.foreach(g)
>>> b.value
6

>>> from pyspark.accumulators import AccumulatorParam
>>> class VectorAccumulatorParam(AccumulatorParam):
...     def zero(self, value):
...         return [0.0] * len(value)
...     def addInPlace(self, val1, val2):
...         for i in xrange(len(val1)):
...              val1[i] += val2[i]
...         return val1
>>> va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam())
>>> va.value
[1.0, 2.0, 3.0]
>>> def g(x):
...     global va
...     va += [x] * 3
>>> rdd.foreach(g)
>>> va.value
[7.0, 8.0, 9.0]

>>> rdd.map(lambda x: a.value).collect() # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
    ...
Py4JJavaError:...

>>> def h(x):
...     global a
...     a.value = 7
>>> rdd.foreach(h) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
    ...
Py4JJavaError:...

>>> sc.accumulator([1.0, 2.0, 3.0]) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
    ...
Exception:...
"""

import select
import struct
import SocketServer
import threading
from pyspark.cloudpickle import CloudPickler
from pyspark.serializers import read_int, PickleSerializer


__all__ = ['Accumulator', 'AccumulatorParam']


pickleSer = PickleSerializer()

# Holds accumulators registered on the current machine, keyed by ID. This is then used to send
# the local accumulator updates back to the driver program at the end of a task.
_accumulatorRegistry = {}


def _deserialize_accumulator(aid, zero_value, accum_param):
    from pyspark.accumulators import _accumulatorRegistry
    accum = Accumulator(aid, zero_value, accum_param)
    accum._deserialized = True
    _accumulatorRegistry[aid] = accum
    return accum


class Accumulator(object):

    """
    A shared variable that can be accumulated, i.e., has a commutative and associative "add"
    operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=}
    operator, but only the driver program is allowed to access its value, using C{value}.
    Updates from the workers get propagated automatically to the driver program.

    While C{SparkContext} supports accumulators for primitive data types like C{int} and
    C{float}, users can also define accumulators for custom types by providing a custom
    L{AccumulatorParam} object. Refer to the doctest of this module for an example.
    """

    def __init__(self, aid, value, accum_param):
        """Create a new Accumulator with a given initial value and AccumulatorParam object"""
        from pyspark.accumulators import _accumulatorRegistry
        self.aid = aid
        self.accum_param = accum_param
        self._value = value
        self._deserialized = False
        _accumulatorRegistry[aid] = self

    def __reduce__(self):
        """Custom serialization; saves the zero value from our AccumulatorParam"""
        param = self.accum_param
        return (_deserialize_accumulator, (self.aid, param.zero(self._value), param))

    @property
    def value(self):
        """Get the accumulator's value; only usable in driver program"""
        if self._deserialized:
            raise Exception("Accumulator.value cannot be accessed inside tasks")
        return self._value

    @value.setter
    def value(self, value):
        """Sets the accumulator's value; only usable in driver program"""
        if self._deserialized:
            raise Exception("Accumulator.value cannot be accessed inside tasks")
        self._value = value

    def add(self, term):
        """Adds a term to this accumulator's value"""
        self._value = self.accum_param.addInPlace(self._value, term)

    def __iadd__(self, term):
        """The += operator; adds a term to this accumulator's value"""
        self.add(term)
        return self

    def __str__(self):
        return str(self._value)

    def __repr__(self):
        return "Accumulator<id=%i, value=%s>" % (self.aid, self._value)


class AccumulatorParam(object):

    """
    Helper object that defines how to accumulate values of a given type.
    """

    def zero(self, value):
        """
        Provide a "zero value" for the type, compatible in dimensions with the
        provided C{value} (e.g., a zero vector)
        """
        raise NotImplementedError

    def addInPlace(self, value1, value2):
        """
        Add two values of the accumulator's data type, returning a new value;
        for efficiency, can also update C{value1} in place and return it.
        """
        raise NotImplementedError


class AddingAccumulatorParam(AccumulatorParam):

    """
    An AccumulatorParam that uses the + operators to add values. Designed for simple types
    such as integers, floats, and lists. Requires the zero value for the underlying type
    as a parameter.
    """

    def __init__(self, zero_value):
        self.zero_value = zero_value

    def zero(self, value):
        return self.zero_value

    def addInPlace(self, value1, value2):
        value1 += value2
        return value1


# Singleton accumulator params for some standard types
INT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0)
FLOAT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0)
COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)


class PStatsParam(AccumulatorParam):
    """PStatsParam is used to merge pstats.Stats"""

    @staticmethod
    def zero(value):
        return None

    @staticmethod
    def addInPlace(value1, value2):
        if value1 is None:
            return value2
        value1.add(value2)
        return value1


class _UpdateRequestHandler(SocketServer.StreamRequestHandler):

    """
    This handler will keep polling updates from the same socket until the
    server is shutdown.
    """

    def handle(self):
        from pyspark.accumulators import _accumulatorRegistry
        while not self.server.server_shutdown:
            # Poll every 1 second for new data -- don't block in case of shutdown.
            r, _, _ = select.select([self.rfile], [], [], 1)
            if self.rfile in r:
                num_updates = read_int(self.rfile)
                for _ in range(num_updates):
                    (aid, update) = pickleSer._read_with_length(self.rfile)
                    _accumulatorRegistry[aid] += update
                # Write a byte in acknowledgement
                self.wfile.write(struct.pack("!b", 1))


class AccumulatorServer(SocketServer.TCPServer):

    """
    A simple TCP server that intercepts shutdown() in order to interrupt
    our continuous polling on the handler.
    """
    server_shutdown = False

    def shutdown(self):
        self.server_shutdown = True
        SocketServer.TCPServer.shutdown(self)


def _start_update_server():
    """Start a TCP server to receive accumulator updates in a daemon thread, and returns it"""
    server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler)
    thread = threading.Thread(target=server.serve_forever)
    thread.daemon = True
    thread.start()
    return server