diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a5e6e2b054963b56db8bde3e4a872ece6e685a87..291c1caaaed57cb62efbf27d23d698fe21f4f13c 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2072,10 +2072,12 @@ class RDD(object): batchSize = min(10, self.ctx._batchSize or 1024) ser = BatchedSerializer(PickleSerializer(), batchSize) selfCopy = self._reserialize(ser) + jrdd_deserializer = selfCopy._jrdd_deserializer jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle) else: + jrdd_deserializer = self._jrdd_deserializer jrdd = self._jrdd.coalesce(numPartitions, shuffle) - return RDD(jrdd, self.ctx, self._jrdd_deserializer) + return RDD(jrdd, self.ctx, jrdd_deserializer) def zip(self, other): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index c6c87a9ea5555897fdde908d531c8ea2449c31bb..bb13de563cdd4d167a367d62716b2bc11afe5d9d 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1037,6 +1037,12 @@ class RDDTests(ReusedPySparkTestCase): zeros = len([x for x in l if x == 0]) self.assertTrue(zeros == 0) + def test_repartition_on_textfile(self): + path = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt") + rdd = self.sc.textFile(path) + result = rdd.repartition(1).collect() + self.assertEqual(u"Hello World!", result[0]) + def test_distinct(self): rdd = self.sc.parallelize((1, 2, 3)*10, 10) self.assertEqual(rdd.getNumPartitions(), 10)