Skip to content
Snippets Groups Projects
  • Davies Liu's avatar
    1aa549ba
    [SPARK-3478] [PySpark] Profile the Python tasks · 1aa549ba
    Davies Liu authored
    This patch add profiling support for PySpark, it will show the profiling results
    before the driver exits, here is one example:
    
    ```
    ============================================================
    Profile of RDD<id=3>
    ============================================================
             5146507 function calls (5146487 primitive calls) in 71.094 seconds
    
       Ordered by: internal time, cumulative time
    
       ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      5144576   68.331    0.000   68.331    0.000 statcounter.py:44(merge)
           20    2.735    0.137   71.071    3.554 statcounter.py:33(__init__)
           20    0.017    0.001    0.017    0.001 {cPickle.dumps}
         1024    0.003    0.000    0.003    0.000 t.py:16(<lambda>)
           20    0.001    0.000    0.001    0.000 {reduce}
           21    0.001    0.000    0.001    0.000 {cPickle.loads}
           20    0.001    0.000    0.001    0.000 copy_reg.py:95(_slotnames)
           41    0.001    0.000    0.001    0.000 serializers.py:461(read_int)
           40    0.001    0.000    0.002    0.000 serializers.py:179(_batched)
           62    0.000    0.000    0.000    0.000 {method 'read' of 'file' objects}
           20    0.000    0.000   71.072    3.554 rdd.py:863(<lambda>)
           20    0.000    0.000    0.001    0.000 serializers.py:198(load_stream)
        40/20    0.000    0.000   71.072    3.554 rdd.py:2093(pipeline_func)
           41    0.000    0.000    0.002    0.000 serializers.py:130(load_stream)
           40    0.000    0.000   71.072    1.777 rdd.py:304(func)
           20    0.000    0.000   71.094    3.555 worker.py:82(process)
    ```
    
    Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk
    by `sc.dump_profiles(path)`, such as
    
    ```python
    >>> sc._conf.set("spark.python.profile", "true")
    >>> rdd = sc.parallelize(range(100)).map(str)
    >>> rdd.count()
    100
    >>> sc.show_profiles()
    ============================================================
    Profile of RDD<id=1>
    ============================================================
             284 function calls (276 primitive calls) in 0.001 seconds
    
       Ordered by: internal time, cumulative time
    
       ncalls  tottime  percall  cumtime  percall filename:lineno(function)
            4    0.000    0.000    0.000    0.000 serializers.py:198(load_stream)
            4    0.000    0.000    0.000    0.000 {reduce}
         12/4    0.000    0.000    0.001    0.000 rdd.py:2092(pipeline_func)
            4    0.000    0.000    0.000    0.000 {cPickle.loads}
            4    0.000    0.000    0.000    0.000 {cPickle.dumps}
          104    0.000    0.000    0.000    0.000 rdd.py:852(<genexpr>)
            8    0.000    0.000    0.000    0.000 serializers.py:461(read_int)
           12    0.000    0.000    0.000    0.000 rdd.py:303(func)
    ```
    The profiling is disabled by default, can be enabled by "spark.python.profile=true".
    
    Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"
    
    Author: Davies Liu <davies.liu@gmail.com>
    
    Closes #2351 from davies/profiler and squashes the following commits:
    
    7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles()
    2b0daf2 [Davies Liu] fix docs
    7a56c24 [Davies Liu] bugfix
    cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext
    fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
    116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
    09d02c3 [Davies Liu] Merge branch 'master' into profiler
    c23865c [Davies Liu] Merge branch 'master' into profiler
    15d6f18 [Davies Liu] add docs for two configs
    dadee1a [Davies Liu] add docs string and clear profiles after show or dump
    4f8309d [Davies Liu] address comment, add tests
    0a5b6eb [Davies Liu] fix Python UDF
    4b20494 [Davies Liu] add profile for python
    1aa549ba
    History
    [SPARK-3478] [PySpark] Profile the Python tasks
    Davies Liu authored
    This patch add profiling support for PySpark, it will show the profiling results
    before the driver exits, here is one example:
    
    ```
    ============================================================
    Profile of RDD<id=3>
    ============================================================
             5146507 function calls (5146487 primitive calls) in 71.094 seconds
    
       Ordered by: internal time, cumulative time
    
       ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      5144576   68.331    0.000   68.331    0.000 statcounter.py:44(merge)
           20    2.735    0.137   71.071    3.554 statcounter.py:33(__init__)
           20    0.017    0.001    0.017    0.001 {cPickle.dumps}
         1024    0.003    0.000    0.003    0.000 t.py:16(<lambda>)
           20    0.001    0.000    0.001    0.000 {reduce}
           21    0.001    0.000    0.001    0.000 {cPickle.loads}
           20    0.001    0.000    0.001    0.000 copy_reg.py:95(_slotnames)
           41    0.001    0.000    0.001    0.000 serializers.py:461(read_int)
           40    0.001    0.000    0.002    0.000 serializers.py:179(_batched)
           62    0.000    0.000    0.000    0.000 {method 'read' of 'file' objects}
           20    0.000    0.000   71.072    3.554 rdd.py:863(<lambda>)
           20    0.000    0.000    0.001    0.000 serializers.py:198(load_stream)
        40/20    0.000    0.000   71.072    3.554 rdd.py:2093(pipeline_func)
           41    0.000    0.000    0.002    0.000 serializers.py:130(load_stream)
           40    0.000    0.000   71.072    1.777 rdd.py:304(func)
           20    0.000    0.000   71.094    3.555 worker.py:82(process)
    ```
    
    Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk
    by `sc.dump_profiles(path)`, such as
    
    ```python
    >>> sc._conf.set("spark.python.profile", "true")
    >>> rdd = sc.parallelize(range(100)).map(str)
    >>> rdd.count()
    100
    >>> sc.show_profiles()
    ============================================================
    Profile of RDD<id=1>
    ============================================================
             284 function calls (276 primitive calls) in 0.001 seconds
    
       Ordered by: internal time, cumulative time
    
       ncalls  tottime  percall  cumtime  percall filename:lineno(function)
            4    0.000    0.000    0.000    0.000 serializers.py:198(load_stream)
            4    0.000    0.000    0.000    0.000 {reduce}
         12/4    0.000    0.000    0.001    0.000 rdd.py:2092(pipeline_func)
            4    0.000    0.000    0.000    0.000 {cPickle.loads}
            4    0.000    0.000    0.000    0.000 {cPickle.dumps}
          104    0.000    0.000    0.000    0.000 rdd.py:852(<genexpr>)
            8    0.000    0.000    0.000    0.000 serializers.py:461(read_int)
           12    0.000    0.000    0.000    0.000 rdd.py:303(func)
    ```
    The profiling is disabled by default, can be enabled by "spark.python.profile=true".
    
    Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"
    
    Author: Davies Liu <davies.liu@gmail.com>
    
    Closes #2351 from davies/profiler and squashes the following commits:
    
    7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles()
    2b0daf2 [Davies Liu] fix docs
    7a56c24 [Davies Liu] bugfix
    cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext
    fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
    116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
    09d02c3 [Davies Liu] Merge branch 'master' into profiler
    c23865c [Davies Liu] Merge branch 'master' into profiler
    15d6f18 [Davies Liu] add docs for two configs
    dadee1a [Davies Liu] add docs string and clear profiles after show or dump
    4f8309d [Davies Liu] address comment, add tests
    0a5b6eb [Davies Liu] fix Python UDF
    4b20494 [Davies Liu] add profile for python
worker.py 4.98 KiB
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Worker that receives input from Piped RDD.
"""
import os
import sys
import time
import socket
import traceback
import cProfile
import pstats

from pyspark.accumulators import _accumulatorRegistry
from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.files import SparkFiles
from pyspark.serializers import write_with_length, write_int, read_long, \
    write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, \
    CompressedSerializer
from pyspark import shuffle

pickleSer = PickleSerializer()
utf8_deserializer = UTF8Deserializer()


def report_times(outfile, boot, init, finish):
    write_int(SpecialLengths.TIMING_DATA, outfile)
    write_long(1000 * boot, outfile)
    write_long(1000 * init, outfile)
    write_long(1000 * finish, outfile)


def add_path(path):
    # worker can be used, so donot add path multiple times
    if path not in sys.path:
        # overwrite system packages
        sys.path.insert(1, path)


def main(infile, outfile):
    try:
        boot_time = time.time()
        split_index = read_int(infile)
        if split_index == -1:  # for unit tests
            return

        # initialize global state
        shuffle.MemoryBytesSpilled = 0
        shuffle.DiskBytesSpilled = 0
        _accumulatorRegistry.clear()

        # fetch name of workdir
        spark_files_dir = utf8_deserializer.loads(infile)
        SparkFiles._root_directory = spark_files_dir
        SparkFiles._is_running_on_worker = True

        # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
        add_path(spark_files_dir)  # *.py files that were added will be copied here
        num_python_includes = read_int(infile)
        for _ in range(num_python_includes):
            filename = utf8_deserializer.loads(infile)
            add_path(os.path.join(spark_files_dir, filename))

        # fetch names and values of broadcast variables
        num_broadcast_variables = read_int(infile)
        ser = CompressedSerializer(pickleSer)
        for _ in range(num_broadcast_variables):
            bid = read_long(infile)
            if bid >= 0:
                value = ser._read_with_length(infile)
                _broadcastRegistry[bid] = Broadcast(bid, value)
            else:
                bid = - bid - 1
                _broadcastRegistry.pop(bid)

        _accumulatorRegistry.clear()
        command = pickleSer._read_with_length(infile)
        if isinstance(command, Broadcast):
            command = pickleSer.loads(command.value)
        (func, stats, deserializer, serializer) = command
        init_time = time.time()

        def process():
            iterator = deserializer.load_stream(infile)
            serializer.dump_stream(func(split_index, iterator), outfile)

        if stats:
            p = cProfile.Profile()
            p.runcall(process)
            st = pstats.Stats(p)
            st.stream = None  # make it picklable
            stats.add(st.strip_dirs())
        else:
            process()
    except Exception:
        try:
            write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
            write_with_length(traceback.format_exc(), outfile)
            outfile.flush()
        except IOError:
            # JVM close the socket
            pass
        except Exception:
            # Write the error to stderr if it happened while serializing
            print >> sys.stderr, "PySpark worker failed with exception:"
            print >> sys.stderr, traceback.format_exc()
        exit(-1)
    finish_time = time.time()
    report_times(outfile, boot_time, init_time, finish_time)
    write_long(shuffle.MemoryBytesSpilled, outfile)
    write_long(shuffle.DiskBytesSpilled, outfile)

    # Mark the beginning of the accumulators section of the output
    write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
    write_int(len(_accumulatorRegistry), outfile)
    for (aid, accum) in _accumulatorRegistry.items():
        pickleSer._write_with_length((aid, accum._value), outfile)


if __name__ == '__main__':
    # Read a local port to connect to from stdin
    java_port = int(sys.stdin.readline())
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(("127.0.0.1", java_port))
    sock_file = sock.makefile("a+", 65536)
    main(sock_file, sock_file)