From c06f3f5ac500b02d38ca7ec5fcb33085e07f2f75 Mon Sep 17 00:00:00 2001 From: peay <peay@protonmail.com> Date: Wed, 9 Aug 2017 14:03:18 -0700 Subject: [PATCH] [SPARK-21551][PYTHON] Increase timeout for PythonRDD.serveIterator ## What changes were proposed in this pull request? This modification increases the timeout for `serveIterator` (which is not dynamically configurable). This fixes timeout issues in pyspark when using `collect` and similar functions, in cases where Python may take more than a couple seconds to connect. See https://issues.apache.org/jira/browse/SPARK-21551 ## How was this patch tested? Ran the tests. cc rxin Author: peay <peay@protonmail.com> Closes #18752 from peay/spark-21551. --- .../main/scala/org/apache/spark/api/python/PythonRDD.scala | 6 +++--- python/pyspark/rdd.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 6a81752400..33771011fe 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -683,7 +683,7 @@ private[spark] object PythonRDD extends Logging { * Create a socket server and a background thread to serve the data in `items`, * * The socket server can only accept one connection, or close if no connection - * in 3 seconds. + * in 15 seconds. * * Once a connection comes in, it tries to serialize all the data in `items` * and send them into this connection. @@ -692,8 +692,8 @@ private[spark] object PythonRDD extends Logging { */ def serveIterator[T](items: Iterator[T], threadName: String): Int = { val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost")) - // Close the socket if no connection in 3 seconds - serverSocket.setSoTimeout(3000) + // Close the socket if no connection in 15 seconds + serverSocket.setSoTimeout(15000) new Thread(threadName) { setDaemon(true) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3325b65f8b..ea993c572f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -127,7 +127,7 @@ def _load_from_socket(port, serializer): af, socktype, proto, canonname, sa = res sock = socket.socket(af, socktype, proto) try: - sock.settimeout(3) + sock.settimeout(15) sock.connect(sa) except socket.error: sock.close() -- GitLab