From ecf437a64874a31328f4e28c6b24f37557fbe07d Mon Sep 17 00:00:00 2001
From: Liang-Chi Hsieh <viirya@gmail.com>
Date: Thu, 31 Aug 2017 12:55:38 +0900
Subject: [PATCH] [SPARK-21534][SQL][PYSPARK] PickleException when creating
 dataframe from python row with empty bytearray

## What changes were proposed in this pull request?

`PickleException` is thrown when creating dataframe from python row with empty bytearray

    spark.createDataFrame(spark.sql("select unhex('') as xx").rdd.map(lambda x: {"abc": x.xx})).show()

    net.razorvine.pickle.PickleException: invalid pickle data for bytearray; expected 1 or 2 args, got 0
    	at net.razorvine.pickle.objects.ByteArrayConstructor.construct(ByteArrayConstructor.java
        ...

`ByteArrayConstructor` doesn't deal with empty byte array pickled by Python3.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19085 from viirya/SPARK-21534.
---
 .../org/apache/spark/api/python/SerDeUtil.scala    | 14 ++++++++++++++
 python/pyspark/sql/tests.py                        |  4 +++-
 2 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index aaf8e7a1d7..01e64b6972 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -35,6 +35,16 @@ import org.apache.spark.rdd.RDD
 
 /** Utilities for serialization / deserialization between Python and Java, using Pickle. */
 private[spark] object SerDeUtil extends Logging {
+  class ByteArrayConstructor extends net.razorvine.pickle.objects.ByteArrayConstructor {
+    override def construct(args: Array[Object]): Object = {
+      // Deal with an empty byte array pickled by Python 3.
+      if (args.length == 0) {
+        Array.emptyByteArray
+      } else {
+        super.construct(args)
+      }
+    }
+  }
   // Unpickle array.array generated by Python 2.6
   class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
     //  /* Description of types */
@@ -108,6 +118,10 @@ private[spark] object SerDeUtil extends Logging {
     synchronized{
       if (!initialized) {
         Unpickler.registerConstructor("array", "array", new ArrayConstructor())
+        Unpickler.registerConstructor("__builtin__", "bytearray", new ByteArrayConstructor())
+        Unpickler.registerConstructor("builtins", "bytearray", new ByteArrayConstructor())
+        Unpickler.registerConstructor("__builtin__", "bytes", new ByteArrayConstructor())
+        Unpickler.registerConstructor("_codecs", "encode", new ByteArrayConstructor())
         initialized = true
       }
     }
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 1ecde68fb0..b3102853ce 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -2383,9 +2383,11 @@ class SQLTests(ReusedPySparkTestCase):
 
     def test_BinaryType_serialization(self):
         # Pyrolite version <= 4.9 could not serialize BinaryType with Python3 SPARK-17808
+        # The empty bytearray is test for SPARK-21534.
         schema = StructType([StructField('mybytes', BinaryType())])
         data = [[bytearray(b'here is my data')],
-                [bytearray(b'and here is some more')]]
+                [bytearray(b'and here is some more')],
+                [bytearray(b'')]]
         df = self.spark.createDataFrame(data, schema=schema)
         df.collect()
 
-- 
GitLab