diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index a0b819220e6d3fb8d66f36a6624f576d86cd7e3a..74dee1420754a61903fee28237e5df86a9c55210 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -20,6 +20,8 @@ import sys import gc from tempfile import NamedTemporaryFile +from pyspark.cloudpickle import print_exec + if sys.version < '3': import cPickle as pickle else: @@ -75,7 +77,14 @@ class Broadcast(object): self._path = path def dump(self, value, f): - pickle.dump(value, f, 2) + try: + pickle.dump(value, f, 2) + except pickle.PickleError: + raise + except Exception as e: + msg = "Could not serialize broadcast: " + e.__class__.__name__ + ": " + e.message + print_exec(sys.stderr) + raise pickle.PicklingError(msg) f.close() return f.name diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 822ae46e4511178372ef740c23f2c4b273cccb55..da2b2f3757967e05f44308d42e128fe37100aca8 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -109,6 +109,16 @@ class CloudPickler(Pickler): if 'recursion' in e.args[0]: msg = """Could not pickle object as excessively deep recursion required.""" raise pickle.PicklingError(msg) + except pickle.PickleError: + raise + except Exception as e: + if "'i' format requires" in e.message: + msg = "Object too large to serialize: " + e.message + else: + msg = "Could not serialize object: " + e.__class__.__name__ + ": " + e.message + print_exec(sys.stderr) + raise pickle.PicklingError(msg) + def save_memoryview(self, obj): """Fallback to save_string"""