Skip to content
Snippets Groups Projects
  • Liang-Chi Hsieh's avatar
    95c95b71
    [SPARK-18281] [SQL] [PYSPARK] Remove timeout for reading data through socket for local iterator · 95c95b71
    Liang-Chi Hsieh authored
    ## What changes were proposed in this pull request?
    
    There is a timeout failure when using `rdd.toLocalIterator()` or `df.toLocalIterator()` for a PySpark RDD and DataFrame:
    
        df = spark.createDataFrame([[1],[2],[3]])
        it = df.toLocalIterator()
        row = next(it)
    
        df2 = df.repartition(1000)  # create many empty partitions which increase materialization time so causing timeout
        it2 = df2.toLocalIterator()
        row = next(it2)
    
    The cause of this issue is, we open a socket to serve the data from JVM side. We set timeout for connection and reading through the socket in Python side. In Python we use a generator to read the data, so we only begin to connect the socket once we start to ask data from it. If we don't consume it immediately, there is connection timeout.
    
    In the other side, the materialization time for RDD partitions is unpredictable. So we can't set a timeout for reading data through the socket. Otherwise, it is very possibly to fail.
    
    ## How was this patch tested?
    
    Added tests into PySpark.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.
    
    Author: Liang-Chi Hsieh <viirya@gmail.com>
    
    Closes #16263 from viirya/fix-pyspark-localiterator.
    95c95b71
    History
    [SPARK-18281] [SQL] [PYSPARK] Remove timeout for reading data through socket for local iterator
    Liang-Chi Hsieh authored
    ## What changes were proposed in this pull request?
    
    There is a timeout failure when using `rdd.toLocalIterator()` or `df.toLocalIterator()` for a PySpark RDD and DataFrame:
    
        df = spark.createDataFrame([[1],[2],[3]])
        it = df.toLocalIterator()
        row = next(it)
    
        df2 = df.repartition(1000)  # create many empty partitions which increase materialization time so causing timeout
        it2 = df2.toLocalIterator()
        row = next(it2)
    
    The cause of this issue is, we open a socket to serve the data from JVM side. We set timeout for connection and reading through the socket in Python side. In Python we use a generator to read the data, so we only begin to connect the socket once we start to ask data from it. If we don't consume it immediately, there is connection timeout.
    
    In the other side, the materialization time for RDD partitions is unpredictable. So we can't set a timeout for reading data through the socket. Otherwise, it is very possibly to fail.
    
    ## How was this patch tested?
    
    Added tests into PySpark.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.
    
    Author: Liang-Chi Hsieh <viirya@gmail.com>
    
    Closes #16263 from viirya/fix-pyspark-localiterator.