From dbfc7aa4d0d5457bc92e1e66d065c6088d476843 Mon Sep 17 00:00:00 2001
From: Eric Liang <ekl@databricks.com>
Date: Wed, 14 Sep 2016 13:37:35 -0700
Subject: [PATCH] [SPARK-17472] [PYSPARK] Better error message for
 serialization failures of large objects in Python

## What changes were proposed in this pull request?

For large objects, pickle does not raise useful error messages. However, we can wrap them to be slightly more user friendly:

Example 1:
```
def run():
  import numpy.random as nr
  b = nr.bytes(8 * 1000000000)
  sc.parallelize(range(1000), 1000).map(lambda x: len(b)).count()

run()
```

Before:
```
error: 'i' format requires -2147483648 <= number <= 2147483647
```

After:
```
pickle.PicklingError: Object too large to serialize: 'i' format requires -2147483648 <= number <= 2147483647
```

Example 2:
```
def run():
  import numpy.random as nr
  b = sc.broadcast(nr.bytes(8 * 1000000000))
  sc.parallelize(range(1000), 1000).map(lambda x: len(b.value)).count()

run()
```

Before:
```
SystemError: error return without exception set
```

After:
```
cPickle.PicklingError: Could not serialize broadcast: SystemError: error return without exception set
```

## How was this patch tested?

Manually tried out these cases

cc davies

Author: Eric Liang <ekl@databricks.com>

Closes #15026 from ericl/spark-17472.
---
 python/pyspark/broadcast.py   | 11 ++++++++++-
 python/pyspark/cloudpickle.py | 10 ++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index a0b819220e..74dee14207 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -20,6 +20,8 @@ import sys
 import gc
 from tempfile import NamedTemporaryFile
 
+from pyspark.cloudpickle import print_exec
+
 if sys.version < '3':
     import cPickle as pickle
 else:
@@ -75,7 +77,14 @@ class Broadcast(object):
             self._path = path
 
     def dump(self, value, f):
-        pickle.dump(value, f, 2)
+        try:
+            pickle.dump(value, f, 2)
+        except pickle.PickleError:
+            raise
+        except Exception as e:
+            msg = "Could not serialize broadcast: " + e.__class__.__name__ + ": " + e.message
+            print_exec(sys.stderr)
+            raise pickle.PicklingError(msg)
         f.close()
         return f.name
 
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index 822ae46e45..da2b2f3757 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -109,6 +109,16 @@ class CloudPickler(Pickler):
             if 'recursion' in e.args[0]:
                 msg = """Could not pickle object as excessively deep recursion required."""
                 raise pickle.PicklingError(msg)
+        except pickle.PickleError:
+            raise
+        except Exception as e:
+            if "'i' format requires" in e.message:
+                msg = "Object too large to serialize: " + e.message
+            else:
+                msg = "Could not serialize object: " + e.__class__.__name__ + ": " + e.message
+            print_exec(sys.stderr)
+            raise pickle.PicklingError(msg)
+            
 
     def save_memoryview(self, obj):
         """Fallback to save_string"""
-- 
GitLab