From b7a40f64e6d83bb6704ac3a63c46cc7c0e9f9e23 Mon Sep 17 00:00:00 2001 From: Xiang Gao <qasdfgtyuiop@gmail.com> Date: Thu, 20 Jul 2017 12:46:06 +0900 Subject: [PATCH] [SPARK-16542][SQL][PYSPARK] Fix bugs about types that result an array of null when creating DataFrame using python ## What changes were proposed in this pull request? This is the reopen of https://github.com/apache/spark/pull/14198, with merge conflicts resolved. ueshin Could you please take a look at my code? Fix bugs about types that result an array of null when creating DataFrame using python. Python's array.array have richer type than python itself, e.g. we can have `array('f',[1,2,3])` and `array('d',[1,2,3])`. Codes in spark-sql and pyspark didn't take this into consideration which might cause a problem that you get an array of null values when you have `array('f')` in your rows. A simple code to reproduce this bug is: ``` from pyspark import SparkContext from pyspark.sql import SQLContext,Row,DataFrame from array import array sc = SparkContext() sqlContext = SQLContext(sc) row1 = Row(floatarray=array('f',[1,2,3]), doublearray=array('d',[1,2,3])) rows = sc.parallelize([ row1 ]) df = sqlContext.createDataFrame(rows) df.show() ``` which have output ``` +---------------+------------------+ | doublearray| floatarray| +---------------+------------------+ |[1.0, 2.0, 3.0]|[null, null, null]| +---------------+------------------+ ``` ## How was this patch tested? New test case added Author: Xiang Gao <qasdfgtyuiop@gmail.com> Author: Gao, Xiang <qasdfgtyuiop@gmail.com> Author: Takuya UESHIN <ueshin@databricks.com> Closes #18444 from zasdfgbnm/fix_array_infer. --- .../apache/spark/api/python/SerDeUtil.scala | 20 +++- python/pyspark/sql/tests.py | 97 ++++++++++++++++++- python/pyspark/sql/types.py | 95 +++++++++++++++++- .../sql/execution/python/EvaluatePython.scala | 10 ++ 4 files changed, 216 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 42f67e8dbe..aaf8e7a1d7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -55,13 +55,12 @@ private[spark] object SerDeUtil extends Logging { // {'d', sizeof(double), d_getitem, d_setitem}, // {'\0', 0, 0, 0} /* Sentinel */ // }; - // TODO: support Py_UNICODE with 2 bytes val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { - Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9, + Map('B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9, 'L' -> 11, 'l' -> 13, 'f' -> 15, 'd' -> 17, 'u' -> 21 ) } else { - Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8, + Map('B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8, 'L' -> 10, 'l' -> 12, 'f' -> 14, 'd' -> 16, 'u' -> 20 ) } @@ -72,7 +71,20 @@ private[spark] object SerDeUtil extends Logging { val typecode = args(0).asInstanceOf[String].charAt(0) // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly val data = args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1) - construct(typecode, machineCodes(typecode), data) + if (typecode == 'c') { + // It seems like the pickle of pypy uses the similar protocol to Python 2.6, which uses + // a string for array data instead of list as Python 2.7, and handles an array of + // typecode 'c' as 1-byte character. + val result = new Array[Char](data.length) + var i = 0 + while (i < data.length) { + result(i) = data(i).toChar + i += 1 + } + result + } else { + construct(typecode, machineCodes(typecode), data) + } } else if (args.length == 2 && args(0) == "l") { // On Python 2, an array of typecode 'l' should be handled as long rather than int. val values = args(1).asInstanceOf[JArrayList[_]] diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index be5495ca01..1c1a0cad49 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -30,8 +30,10 @@ import pickle import functools import time import datetime - +import array +import ctypes import py4j + try: import xmlrunner except ImportError: @@ -58,6 +60,8 @@ from pyspark import SparkContext from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type, _make_type_verifier +from pyspark.sql.types import _array_signed_int_typecode_ctype_mappings, _array_type_mappings +from pyspark.sql.types import _array_unsigned_int_typecode_ctype_mappings from pyspark.tests import QuietTest, ReusedPySparkTestCase, SparkSubmitTests from pyspark.sql.functions import UserDefinedFunction, sha2, lit from pyspark.sql.window import Window @@ -2333,6 +2337,97 @@ class SQLTests(ReusedPySparkTestCase): df = self.spark.createDataFrame(data, schema=schema) df.collect() + # test for SPARK-16542 + def test_array_types(self): + # This test need to make sure that the Scala type selected is at least + # as large as the python's types. This is necessary because python's + # array types depend on C implementation on the machine. Therefore there + # is no machine independent correspondence between python's array types + # and Scala types. + # See: https://docs.python.org/2/library/array.html + + def assertCollectSuccess(typecode, value): + row = Row(myarray=array.array(typecode, [value])) + df = self.spark.createDataFrame([row]) + self.assertEqual(df.first()["myarray"][0], value) + + # supported string types + # + # String types in python's array are "u" for Py_UNICODE and "c" for char. + # "u" will be removed in python 4, and "c" is not supported in python 3. + supported_string_types = [] + if sys.version_info[0] < 4: + supported_string_types += ['u'] + # test unicode + assertCollectSuccess('u', u'a') + if sys.version_info[0] < 3: + supported_string_types += ['c'] + # test string + assertCollectSuccess('c', 'a') + + # supported float and double + # + # Test max, min, and precision for float and double, assuming IEEE 754 + # floating-point format. + supported_fractional_types = ['f', 'd'] + assertCollectSuccess('f', ctypes.c_float(1e+38).value) + assertCollectSuccess('f', ctypes.c_float(1e-38).value) + assertCollectSuccess('f', ctypes.c_float(1.123456).value) + assertCollectSuccess('d', sys.float_info.max) + assertCollectSuccess('d', sys.float_info.min) + assertCollectSuccess('d', sys.float_info.epsilon) + + # supported signed int types + # + # The size of C types changes with implementation, we need to make sure + # that there is no overflow error on the platform running this test. + supported_signed_int_types = list( + set(_array_signed_int_typecode_ctype_mappings.keys()) + .intersection(set(_array_type_mappings.keys()))) + for t in supported_signed_int_types: + ctype = _array_signed_int_typecode_ctype_mappings[t] + max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1) + assertCollectSuccess(t, max_val - 1) + assertCollectSuccess(t, -max_val) + + # supported unsigned int types + # + # JVM does not have unsigned types. We need to be very careful to make + # sure that there is no overflow error. + supported_unsigned_int_types = list( + set(_array_unsigned_int_typecode_ctype_mappings.keys()) + .intersection(set(_array_type_mappings.keys()))) + for t in supported_unsigned_int_types: + ctype = _array_unsigned_int_typecode_ctype_mappings[t] + assertCollectSuccess(t, 2 ** (ctypes.sizeof(ctype) * 8) - 1) + + # all supported types + # + # Make sure the types tested above: + # 1. are all supported types + # 2. cover all supported types + supported_types = (supported_string_types + + supported_fractional_types + + supported_signed_int_types + + supported_unsigned_int_types) + self.assertEqual(set(supported_types), set(_array_type_mappings.keys())) + + # all unsupported types + # + # Keys in _array_type_mappings is a complete list of all supported types, + # and types not in _array_type_mappings are considered unsupported. + # `array.typecodes` are not supported in python 2. + if sys.version_info[0] < 3: + all_types = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd']) + else: + all_types = set(array.typecodes) + unsupported_types = all_types - set(supported_types) + # test unsupported types + for t in unsupported_types: + with self.assertRaises(TypeError): + a = array.array(t) + self.spark.createDataFrame([Row(myarray=a)]).collect() + def test_bucketed_write(self): data = [ (1, "foo", 3.0), (2, "foo", 5.0), diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 22fa273fc1..c376805c32 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -24,6 +24,7 @@ import json import re import base64 from array import array +import ctypes if sys.version >= "3": long = int @@ -915,6 +916,93 @@ if sys.version < "3": long: LongType, }) +# Mapping Python array types to Spark SQL DataType +# We should be careful here. The size of these types in python depends on C +# implementation. We need to make sure that this conversion does not lose any +# precision. Also, JVM only support signed types, when converting unsigned types, +# keep in mind that it required 1 more bit when stored as singed types. +# +# Reference for C integer size, see: +# ISO/IEC 9899:201x specification, chapter 5.2.4.2.1 Sizes of integer types <limits.h>. +# Reference for python array typecode, see: +# https://docs.python.org/2/library/array.html +# https://docs.python.org/3.6/library/array.html +# Reference for JVM's supported integral types: +# http://docs.oracle.com/javase/specs/jvms/se8/html/jvms-2.html#jvms-2.3.1 + +_array_signed_int_typecode_ctype_mappings = { + 'b': ctypes.c_byte, + 'h': ctypes.c_short, + 'i': ctypes.c_int, + 'l': ctypes.c_long, +} + +_array_unsigned_int_typecode_ctype_mappings = { + 'B': ctypes.c_ubyte, + 'H': ctypes.c_ushort, + 'I': ctypes.c_uint, + 'L': ctypes.c_ulong +} + + +def _int_size_to_type(size): + """ + Return the Catalyst datatype from the size of integers. + """ + if size <= 8: + return ByteType + if size <= 16: + return ShortType + if size <= 32: + return IntegerType + if size <= 64: + return LongType + +# The list of all supported array typecodes is stored here +_array_type_mappings = { + # Warning: Actual properties for float and double in C is not specified in C. + # On almost every system supported by both python and JVM, they are IEEE 754 + # single-precision binary floating-point format and IEEE 754 double-precision + # binary floating-point format. And we do assume the same thing here for now. + 'f': FloatType, + 'd': DoubleType +} + +# compute array typecode mappings for signed integer types +for _typecode in _array_signed_int_typecode_ctype_mappings.keys(): + size = ctypes.sizeof(_array_signed_int_typecode_ctype_mappings[_typecode]) * 8 + dt = _int_size_to_type(size) + if dt is not None: + _array_type_mappings[_typecode] = dt + +# compute array typecode mappings for unsigned integer types +for _typecode in _array_unsigned_int_typecode_ctype_mappings.keys(): + # JVM does not have unsigned types, so use signed types that is at least 1 + # bit larger to store + size = ctypes.sizeof(_array_unsigned_int_typecode_ctype_mappings[_typecode]) * 8 + 1 + dt = _int_size_to_type(size) + if dt is not None: + _array_type_mappings[_typecode] = dt + +# Type code 'u' in Python's array is deprecated since version 3.3, and will be +# removed in version 4.0. See: https://docs.python.org/3/library/array.html +if sys.version_info[0] < 4: + _array_type_mappings['u'] = StringType + +# Type code 'c' are only available at python 2 +if sys.version_info[0] < 3: + _array_type_mappings['c'] = StringType + +# SPARK-21465: +# In python2, array of 'L' happened to be mistakenly partially supported. To +# avoid breaking user's code, we should keep this partial support. Below is a +# dirty hacking to keep this partial support and make the unit test passes +import platform +if sys.version_info[0] < 3 and platform.python_implementation() != 'PyPy': + if 'L' not in _array_type_mappings.keys(): + _array_type_mappings['L'] = LongType + _array_unsigned_int_typecode_ctype_mappings['L'] = ctypes.c_uint + def _infer_type(obj): """Infer the DataType from obj @@ -938,12 +1026,17 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) - elif isinstance(obj, (list, array)): + elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) + elif isinstance(obj, array): + if obj.typecode in _array_type_mappings: + return ArrayType(_array_type_mappings[obj.typecode](), False) + else: + raise TypeError("not supported type: array(%s)" % obj.typecode) else: try: return _infer_schema(obj) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala index fcd84705f7..38b3aa7891 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala @@ -91,20 +91,30 @@ object EvaluatePython { case (c: Boolean, BooleanType) => c + case (c: Byte, ByteType) => c + case (c: Short, ByteType) => c.toByte case (c: Int, ByteType) => c.toByte case (c: Long, ByteType) => c.toByte + case (c: Byte, ShortType) => c.toShort + case (c: Short, ShortType) => c case (c: Int, ShortType) => c.toShort case (c: Long, ShortType) => c.toShort + case (c: Byte, IntegerType) => c.toInt + case (c: Short, IntegerType) => c.toInt case (c: Int, IntegerType) => c case (c: Long, IntegerType) => c.toInt + case (c: Byte, LongType) => c.toLong + case (c: Short, LongType) => c.toLong case (c: Int, LongType) => c.toLong case (c: Long, LongType) => c + case (c: Float, FloatType) => c case (c: Double, FloatType) => c.toFloat + case (c: Float, DoubleType) => c.toDouble case (c: Double, DoubleType) => c case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale) -- GitLab