diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 42f67e8dbe8654721eb4f17e3245ba1183b1a63e..aaf8e7a1d7461146592a14960555110c90917918 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -55,13 +55,12 @@ private[spark] object SerDeUtil extends Logging {
     //    {'d', sizeof(double), d_getitem, d_setitem},
     //    {'\0', 0, 0, 0} /* Sentinel */
     //  };
-    // TODO: support Py_UNICODE with 2 bytes
     val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
-      Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9,
+      Map('B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9,
         'L' -> 11, 'l' -> 13, 'f' -> 15, 'd' -> 17, 'u' -> 21
       )
     } else {
-      Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
+      Map('B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
         'L' -> 10, 'l' -> 12, 'f' -> 14, 'd' -> 16, 'u' -> 20
       )
     }
@@ -72,7 +71,20 @@ private[spark] object SerDeUtil extends Logging {
         val typecode = args(0).asInstanceOf[String].charAt(0)
         // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly
         val data = args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1)
-        construct(typecode, machineCodes(typecode), data)
+        if (typecode == 'c') {
+          // It seems like the pickle of pypy uses the similar protocol to Python 2.6, which uses
+          // a string for array data instead of list as Python 2.7, and handles an array of
+          // typecode 'c' as 1-byte character.
+          val result = new Array[Char](data.length)
+          var i = 0
+          while (i < data.length) {
+            result(i) = data(i).toChar
+            i += 1
+          }
+          result
+        } else {
+          construct(typecode, machineCodes(typecode), data)
+        }
       } else if (args.length == 2 && args(0) == "l") {
         // On Python 2, an array of typecode 'l' should be handled as long rather than int.
         val values = args(1).asInstanceOf[JArrayList[_]]
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index be5495ca019a227614d8fcac98f6d844310ff85d..1c1a0cad496254675e4761579664d4cc31e2bb48 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -30,8 +30,10 @@ import pickle
 import functools
 import time
 import datetime
-
+import array
+import ctypes
 import py4j
+
 try:
     import xmlrunner
 except ImportError:
@@ -58,6 +60,8 @@ from pyspark import SparkContext
 from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row
 from pyspark.sql.types import *
 from pyspark.sql.types import UserDefinedType, _infer_type, _make_type_verifier
+from pyspark.sql.types import _array_signed_int_typecode_ctype_mappings, _array_type_mappings
+from pyspark.sql.types import _array_unsigned_int_typecode_ctype_mappings
 from pyspark.tests import QuietTest, ReusedPySparkTestCase, SparkSubmitTests
 from pyspark.sql.functions import UserDefinedFunction, sha2, lit
 from pyspark.sql.window import Window
@@ -2333,6 +2337,97 @@ class SQLTests(ReusedPySparkTestCase):
         df = self.spark.createDataFrame(data, schema=schema)
         df.collect()
 
+    # test for SPARK-16542
+    def test_array_types(self):
+        # This test need to make sure that the Scala type selected is at least
+        # as large as the python's types. This is necessary because python's
+        # array types depend on C implementation on the machine. Therefore there
+        # is no machine independent correspondence between python's array types
+        # and Scala types.
+        # See: https://docs.python.org/2/library/array.html
+
+        def assertCollectSuccess(typecode, value):
+            row = Row(myarray=array.array(typecode, [value]))
+            df = self.spark.createDataFrame([row])
+            self.assertEqual(df.first()["myarray"][0], value)
+
+        # supported string types
+        #
+        # String types in python's array are "u" for Py_UNICODE and "c" for char.
+        # "u" will be removed in python 4, and "c" is not supported in python 3.
+        supported_string_types = []
+        if sys.version_info[0] < 4:
+            supported_string_types += ['u']
+            # test unicode
+            assertCollectSuccess('u', u'a')
+        if sys.version_info[0] < 3:
+            supported_string_types += ['c']
+            # test string
+            assertCollectSuccess('c', 'a')
+
+        # supported float and double
+        #
+        # Test max, min, and precision for float and double, assuming IEEE 754
+        # floating-point format.
+        supported_fractional_types = ['f', 'd']
+        assertCollectSuccess('f', ctypes.c_float(1e+38).value)
+        assertCollectSuccess('f', ctypes.c_float(1e-38).value)
+        assertCollectSuccess('f', ctypes.c_float(1.123456).value)
+        assertCollectSuccess('d', sys.float_info.max)
+        assertCollectSuccess('d', sys.float_info.min)
+        assertCollectSuccess('d', sys.float_info.epsilon)
+
+        # supported signed int types
+        #
+        # The size of C types changes with implementation, we need to make sure
+        # that there is no overflow error on the platform running this test.
+        supported_signed_int_types = list(
+            set(_array_signed_int_typecode_ctype_mappings.keys())
+            .intersection(set(_array_type_mappings.keys())))
+        for t in supported_signed_int_types:
+            ctype = _array_signed_int_typecode_ctype_mappings[t]
+            max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1)
+            assertCollectSuccess(t, max_val - 1)
+            assertCollectSuccess(t, -max_val)
+
+        # supported unsigned int types
+        #
+        # JVM does not have unsigned types. We need to be very careful to make
+        # sure that there is no overflow error.
+        supported_unsigned_int_types = list(
+            set(_array_unsigned_int_typecode_ctype_mappings.keys())
+            .intersection(set(_array_type_mappings.keys())))
+        for t in supported_unsigned_int_types:
+            ctype = _array_unsigned_int_typecode_ctype_mappings[t]
+            assertCollectSuccess(t, 2 ** (ctypes.sizeof(ctype) * 8) - 1)
+
+        # all supported types
+        #
+        # Make sure the types tested above:
+        # 1. are all supported types
+        # 2. cover all supported types
+        supported_types = (supported_string_types +
+                           supported_fractional_types +
+                           supported_signed_int_types +
+                           supported_unsigned_int_types)
+        self.assertEqual(set(supported_types), set(_array_type_mappings.keys()))
+
+        # all unsupported types
+        #
+        # Keys in _array_type_mappings is a complete list of all supported types,
+        # and types not in _array_type_mappings are considered unsupported.
+        # `array.typecodes` are not supported in python 2.
+        if sys.version_info[0] < 3:
+            all_types = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd'])
+        else:
+            all_types = set(array.typecodes)
+        unsupported_types = all_types - set(supported_types)
+        # test unsupported types
+        for t in unsupported_types:
+            with self.assertRaises(TypeError):
+                a = array.array(t)
+                self.spark.createDataFrame([Row(myarray=a)]).collect()
+
     def test_bucketed_write(self):
         data = [
             (1, "foo", 3.0), (2, "foo", 5.0),
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 22fa273fc1aacba3d3c0a1b8b6cbd78c4bca3b56..c376805c32738aeab738f6f92cf61caa9c9665d8 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -24,6 +24,7 @@ import json
 import re
 import base64
 from array import array
+import ctypes
 
 if sys.version >= "3":
     long = int
@@ -915,6 +916,93 @@ if sys.version < "3":
         long: LongType,
     })
 
+# Mapping Python array types to Spark SQL DataType
+# We should be careful here. The size of these types in python depends on C
+# implementation. We need to make sure that this conversion does not lose any
+# precision. Also, JVM only support signed types, when converting unsigned types,
+# keep in mind that it required 1 more bit when stored as singed types.
+#
+# Reference for C integer size, see:
+# ISO/IEC 9899:201x specification, chapter 5.2.4.2.1 Sizes of integer types <limits.h>.
+# Reference for python array typecode, see:
+# https://docs.python.org/2/library/array.html
+# https://docs.python.org/3.6/library/array.html
+# Reference for JVM's supported integral types:
+# http://docs.oracle.com/javase/specs/jvms/se8/html/jvms-2.html#jvms-2.3.1
+
+_array_signed_int_typecode_ctype_mappings = {
+    'b': ctypes.c_byte,
+    'h': ctypes.c_short,
+    'i': ctypes.c_int,
+    'l': ctypes.c_long,
+}
+
+_array_unsigned_int_typecode_ctype_mappings = {
+    'B': ctypes.c_ubyte,
+    'H': ctypes.c_ushort,
+    'I': ctypes.c_uint,
+    'L': ctypes.c_ulong
+}
+
+
+def _int_size_to_type(size):
+    """
+    Return the Catalyst datatype from the size of integers.
+    """
+    if size <= 8:
+        return ByteType
+    if size <= 16:
+        return ShortType
+    if size <= 32:
+        return IntegerType
+    if size <= 64:
+        return LongType
+
+# The list of all supported array typecodes is stored here
+_array_type_mappings = {
+    # Warning: Actual properties for float and double in C is not specified in C.
+    # On almost every system supported by both python and JVM, they are IEEE 754
+    # single-precision binary floating-point format and IEEE 754 double-precision
+    # binary floating-point format. And we do assume the same thing here for now.
+    'f': FloatType,
+    'd': DoubleType
+}
+
+# compute array typecode mappings for signed integer types
+for _typecode in _array_signed_int_typecode_ctype_mappings.keys():
+    size = ctypes.sizeof(_array_signed_int_typecode_ctype_mappings[_typecode]) * 8
+    dt = _int_size_to_type(size)
+    if dt is not None:
+        _array_type_mappings[_typecode] = dt
+
+# compute array typecode mappings for unsigned integer types
+for _typecode in _array_unsigned_int_typecode_ctype_mappings.keys():
+    # JVM does not have unsigned types, so use signed types that is at least 1
+    # bit larger to store
+    size = ctypes.sizeof(_array_unsigned_int_typecode_ctype_mappings[_typecode]) * 8 + 1
+    dt = _int_size_to_type(size)
+    if dt is not None:
+        _array_type_mappings[_typecode] = dt
+
+# Type code 'u' in Python's array is deprecated since version 3.3, and will be
+# removed in version 4.0. See: https://docs.python.org/3/library/array.html
+if sys.version_info[0] < 4:
+    _array_type_mappings['u'] = StringType
+
+# Type code 'c' are only available at python 2
+if sys.version_info[0] < 3:
+    _array_type_mappings['c'] = StringType
+
+# SPARK-21465:
+# In python2, array of 'L' happened to be mistakenly partially supported. To
+# avoid breaking user's code, we should keep this partial support. Below is a
+# dirty hacking to keep this partial support and make the unit test passes
+import platform
+if sys.version_info[0] < 3 and platform.python_implementation() != 'PyPy':
+    if 'L' not in _array_type_mappings.keys():
+        _array_type_mappings['L'] = LongType
+        _array_unsigned_int_typecode_ctype_mappings['L'] = ctypes.c_uint
+
 
 def _infer_type(obj):
     """Infer the DataType from obj
@@ -938,12 +1026,17 @@ def _infer_type(obj):
                 return MapType(_infer_type(key), _infer_type(value), True)
         else:
             return MapType(NullType(), NullType(), True)
-    elif isinstance(obj, (list, array)):
+    elif isinstance(obj, list):
         for v in obj:
             if v is not None:
                 return ArrayType(_infer_type(obj[0]), True)
         else:
             return ArrayType(NullType(), True)
+    elif isinstance(obj, array):
+        if obj.typecode in _array_type_mappings:
+            return ArrayType(_array_type_mappings[obj.typecode](), False)
+        else:
+            raise TypeError("not supported type: array(%s)" % obj.typecode)
     else:
         try:
             return _infer_schema(obj)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index fcd84705f7e8b6edc39e080ae01b9d8bf205964c..38b3aa7891b40a0a62e8e9dfc5ded45211ceed37 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -91,20 +91,30 @@ object EvaluatePython {
 
     case (c: Boolean, BooleanType) => c
 
+    case (c: Byte, ByteType) => c
+    case (c: Short, ByteType) => c.toByte
     case (c: Int, ByteType) => c.toByte
     case (c: Long, ByteType) => c.toByte
 
+    case (c: Byte, ShortType) => c.toShort
+    case (c: Short, ShortType) => c
     case (c: Int, ShortType) => c.toShort
     case (c: Long, ShortType) => c.toShort
 
+    case (c: Byte, IntegerType) => c.toInt
+    case (c: Short, IntegerType) => c.toInt
     case (c: Int, IntegerType) => c
     case (c: Long, IntegerType) => c.toInt
 
+    case (c: Byte, LongType) => c.toLong
+    case (c: Short, LongType) => c.toLong
     case (c: Int, LongType) => c.toLong
     case (c: Long, LongType) => c
 
+    case (c: Float, FloatType) => c
     case (c: Double, FloatType) => c.toFloat
 
+    case (c: Float, DoubleType) => c.toDouble
     case (c: Double, DoubleType) => c
 
     case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale)