diff --git a/docs/configuration.md b/docs/configuration.md index 791b6f2aa3261278041cdb95e0e2e291fd35f03e..a6dd7245e1552320d7a136244215e97cb4238a4a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -206,25 +206,6 @@ Apart from these, the following properties are also available, and may be useful used during aggregation goes above this amount, it will spill the data into disks. </td> </tr> -<tr> - <td><code>spark.python.profile</code></td> - <td>false</td> - <td> - Enable profiling in Python worker, the profile result will show up by `sc.show_profiles()`, - or it will be displayed before the driver exiting. It also can be dumped into disk by - `sc.dump_profiles(path)`. If some of the profile results had been displayed maually, - they will not be displayed automatically before driver exiting. - </td> -</tr> -<tr> - <td><code>spark.python.profile.dump</code></td> - <td>(none)</td> - <td> - The directory which is used to dump the profile result before driver exiting. - The results will be dumped as separated file for each RDD. They can be loaded - by ptats.Stats(). If this is specified, the profile result will not be displayed - automatically. -</tr> <tr> <td><code>spark.python.worker.reuse</code></td> <td>true</td> diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index b8cdbbe3cf2b6421e39bfaba95d55fee4252b7ba..ccbca67656c8db95503ebbe2072a722b182feeaa 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -215,21 +215,6 @@ FLOAT_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0) COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j) -class PStatsParam(AccumulatorParam): - """PStatsParam is used to merge pstats.Stats""" - - @staticmethod - def zero(value): - return None - - @staticmethod - def addInPlace(value1, value2): - if value1 is None: - return value2 - value1.add(value2) - return value1 - - class _UpdateRequestHandler(SocketServer.StreamRequestHandler): """ diff --git a/python/pyspark/context.py b/python/pyspark/context.py index abeda19b77d8b78453d10f0e44d2b8105d117831..8e7b00469e246764a7b33025d650403ba64a75f1 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -20,7 +20,6 @@ import shutil import sys from threading import Lock from tempfile import NamedTemporaryFile -import atexit from pyspark import accumulators from pyspark.accumulators import Accumulator @@ -31,6 +30,7 @@ from pyspark.java_gateway import launch_gateway from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ PairDeserializer, CompressedSerializer from pyspark.storagelevel import StorageLevel +from pyspark import rdd from pyspark.rdd import RDD from pyspark.traceback_utils import CallSite, first_spark_call @@ -192,9 +192,6 @@ class SparkContext(object): self._temp_dir = \ self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() - # profiling stats collected for each PythonRDD - self._profile_stats = [] - def _initialize_context(self, jconf): """ Initialize SparkContext in function to allow subclass specific initialization @@ -795,40 +792,6 @@ class SparkContext(object): it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) return list(mappedRDD._collect_iterator_through_file(it)) - def _add_profile(self, id, profileAcc): - if not self._profile_stats: - dump_path = self._conf.get("spark.python.profile.dump") - if dump_path: - atexit.register(self.dump_profiles, dump_path) - else: - atexit.register(self.show_profiles) - - self._profile_stats.append([id, profileAcc, False]) - - def show_profiles(self): - """ Print the profile stats to stdout """ - for i, (id, acc, showed) in enumerate(self._profile_stats): - stats = acc.value - if not showed and stats: - print "=" * 60 - print "Profile of RDD<id=%d>" % id - print "=" * 60 - stats.sort_stats("tottime", "cumtime").print_stats() - # mark it as showed - self._profile_stats[i][2] = True - - def dump_profiles(self, path): - """ Dump the profile stats into directory `path` - """ - if not os.path.exists(path): - os.makedirs(path) - for id, acc, _ in self._profile_stats: - stats = acc.value - if stats: - p = os.path.join(path, "rdd_%d.pstats" % id) - stats.dump_stats(p) - self._profile_stats = [] - def _test(): import atexit diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8ed89e2f9769f8695051aeb2651f3bc0aeabec4f..680140d72d03c6474266622cb658174b8ab56a14 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -15,6 +15,7 @@ # limitations under the License. # +from base64 import standard_b64encode as b64enc import copy from collections import defaultdict from itertools import chain, ifilter, imap @@ -31,7 +32,6 @@ import bisect from random import Random from math import sqrt, log, isinf, isnan -from pyspark.accumulators import PStatsParam from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ PickleSerializer, pack_long, AutoBatchedSerializer @@ -2080,9 +2080,7 @@ class PipelinedRDD(RDD): return self._jrdd_val if self._bypass_serializer: self._jrdd_deserializer = NoOpSerializer() - enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true" - profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None - command = (self.func, profileStats, self._prev_jrdd_deserializer, + command = (self.func, self._prev_jrdd_deserializer, self._jrdd_deserializer) # the serialized command will be compressed by broadcast ser = CloudPickleSerializer() @@ -2104,10 +2102,6 @@ class PipelinedRDD(RDD): self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator) self._jrdd_val = python_rdd.asJavaRDD() - - if enable_profile: - self._id = self._jrdd_val.id() - self.ctx._add_profile(self._id, profileStats) return self._jrdd_val def id(self): diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index ee5bda8bb43d558ae41d52742b324a05ea7d2a1a..653195ea438cf1f09e8f67c3ae48418ed1d826ad 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -974,7 +974,7 @@ class SQLContext(object): [Row(c0=4)] """ func = lambda _, it: imap(lambda x: f(*x), it) - command = (func, None, + command = (func, BatchedSerializer(PickleSerializer(), 1024), BatchedSerializer(PickleSerializer(), 1024)) ser = CloudPickleSerializer() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index e6002afa9c70d37254f682b03500025f972a1f6d..d1bb2033b7a16a9a2a8eae8a7202f12249a93161 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -632,36 +632,6 @@ class TestRDDFunctions(PySparkTestCase): self.assertEquals(result.count(), 3) -class TestProfiler(PySparkTestCase): - - def setUp(self): - self._old_sys_path = list(sys.path) - class_name = self.__class__.__name__ - conf = SparkConf().set("spark.python.profile", "true") - self.sc = SparkContext('local[4]', class_name, batchSize=2, conf=conf) - - def test_profiler(self): - - def heavy_foo(x): - for i in range(1 << 20): - x = 1 - rdd = self.sc.parallelize(range(100)) - rdd.foreach(heavy_foo) - profiles = self.sc._profile_stats - self.assertEqual(1, len(profiles)) - id, acc, _ = profiles[0] - stats = acc.value - self.assertTrue(stats is not None) - width, stat_list = stats.get_print_list([]) - func_names = [func_name for fname, n, func_name in stat_list] - self.assertTrue("heavy_foo" in func_names) - - self.sc.show_profiles() - d = tempfile.gettempdir() - self.sc.dump_profiles(d) - self.assertTrue("rdd_%d.pstats" % id in os.listdir(d)) - - class TestSQL(PySparkTestCase): def setUp(self): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 8257dddfee1c39188b1afd01e8cb3da20ed310fc..c1f6e3e4a1f4010655fa8a3b31e461646c41f264 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -23,8 +23,6 @@ import sys import time import socket import traceback -import cProfile -import pstats from pyspark.accumulators import _accumulatorRegistry from pyspark.broadcast import Broadcast, _broadcastRegistry @@ -92,21 +90,10 @@ def main(infile, outfile): command = pickleSer._read_with_length(infile) if isinstance(command, Broadcast): command = pickleSer.loads(command.value) - (func, stats, deserializer, serializer) = command + (func, deserializer, serializer) = command init_time = time.time() - - def process(): - iterator = deserializer.load_stream(infile) - serializer.dump_stream(func(split_index, iterator), outfile) - - if stats: - p = cProfile.Profile() - p.runcall(process) - st = pstats.Stats(p) - st.stream = None # make it picklable - stats.add(st.strip_dirs()) - else: - process() + iterator = deserializer.load_stream(infile) + serializer.dump_stream(func(split_index, iterator), outfile) except Exception: try: write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)