diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 6fb30d65c5edd7f05b5a57e2fdeebbcfaca595bc..85c04624da4a6dc0261f761fc187cba1641d13de 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -29,7 +29,7 @@ from pyspark.conf import SparkConf
 from pyspark.files import SparkFiles
 from pyspark.java_gateway import launch_gateway
 from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
-    PairDeserializer, CompressedSerializer
+    PairDeserializer, CompressedSerializer, AutoBatchedSerializer
 from pyspark.storagelevel import StorageLevel
 from pyspark.rdd import RDD
 from pyspark.traceback_utils import CallSite, first_spark_call
@@ -67,7 +67,7 @@ class SparkContext(object):
     _default_batch_size_for_serialized_input = 10
 
     def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
-                 environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
+                 environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
                  gateway=None):
         """
         Create a new SparkContext. At least the master and app name should be set,
@@ -83,8 +83,9 @@ class SparkContext(object):
         :param environment: A dictionary of environment variables to set on
                worker nodes.
         :param batchSize: The number of Python objects represented as a single
-               Java object.  Set 1 to disable batching or -1 to use an
-               unlimited batch size.
+               Java object. Set 1 to disable batching, 0 to automatically choose
+               the batch size based on object sizes, or -1 to use an unlimited
+               batch size
         :param serializer: The serializer for RDDs.
         :param conf: A L{SparkConf} object setting Spark properties.
         :param gateway: Use an existing gateway and JVM, otherwise a new JVM
@@ -117,6 +118,8 @@ class SparkContext(object):
         self._unbatched_serializer = serializer
         if batchSize == 1:
             self.serializer = self._unbatched_serializer
+        elif batchSize == 0:
+            self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
         else:
             self.serializer = BatchedSerializer(self._unbatched_serializer,
                                                 batchSize)
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 099fa54cf2bd7a0e7a6adcfa3a35dfd68b852c32..3d1a34b281acc3ed7abd27fa7ac8331cc0c7d4be 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -220,7 +220,7 @@ class AutoBatchedSerializer(BatchedSerializer):
     Choose the size of batch automatically based on the size of object
     """
 
-    def __init__(self, serializer, bestSize=1 << 20):
+    def __init__(self, serializer, bestSize=1 << 16):
         BatchedSerializer.__init__(self, serializer, -1)
         self.bestSize = bestSize
 
@@ -247,7 +247,7 @@ class AutoBatchedSerializer(BatchedSerializer):
                 other.serializer == self.serializer)
 
     def __str__(self):
-        return "BatchedSerializer<%s>" % str(self.serializer)
+        return "AutoBatchedSerializer<%s>" % str(self.serializer)
 
 
 class CartesianDeserializer(FramedSerializer):