diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 9e05da89af0821c0233998a63b653ac78f7db2d7..b384b2b5073329ccc1e2f0a3caa9e6b7f26b2535 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -135,12 +135,11 @@ def _load_from_socket(port, serializer): break if not sock: raise Exception("could not open socket") - try: - rf = sock.makefile("rb", 65536) - for item in serializer.load_stream(rf): - yield item - finally: - sock.close() + # The RDD materialization time is unpredicable, if we set a timeout for socket reading + # operation, it will very possibly fail. See SPARK-18281. + sock.settimeout(None) + # The socket will be automatically closed when garbage-collected. + return serializer.load_stream(sock.makefile("rb", 65536)) def ignore_unicode_prefix(f): diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 89fce8ab25bafa5605efd0805d2e5ddd50de6454..fe314c54a1b188d3aabbf35a39ea4c0ae4159e7b 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -502,6 +502,18 @@ class RDDTests(ReusedPySparkTestCase): self.assertEqual(0, self.sc.emptyRDD().sum()) self.assertEqual(6, self.sc.parallelize([1, 2, 3]).sum()) + def test_to_localiterator(self): + from time import sleep + rdd = self.sc.parallelize([1, 2, 3]) + it = rdd.toLocalIterator() + sleep(5) + self.assertEqual([1, 2, 3], sorted(it)) + + rdd2 = rdd.repartition(1000) + it2 = rdd2.toLocalIterator() + sleep(5) + self.assertEqual([1, 2, 3], sorted(it2)) + def test_save_as_textfile_with_unicode(self): # Regression test for SPARK-970 x = u"\u00A1Hola, mundo!"