Skip to content
Snippets Groups Projects
  • Tathagata Das's avatar
    084dca77
    [SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader Python API · 084dca77
    Tathagata Das authored
    ## What changes were proposed in this pull request?
    
    - Fixed bug in Python API of DataStreamReader.  Because a single path was being converted to a array before calling Java DataStreamReader method (which takes a string only), it gave the following error.
    ```
    File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 947, in pyspark.sql.readwriter.DataStreamReader.json
    Failed example:
        json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'),                 schema = sdf_schema)
    Exception raised:
        Traceback (most recent call last):
          File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/doctest.py", line 1253, in __run
            compileflags, 1) in test.globs
          File "<doctest pyspark.sql.readwriter.DataStreamReader.json[0]>", line 1, in <module>
            json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'),                 schema = sdf_schema)
          File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 963, in json
            return self._df(self._jreader.json(path))
          File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
            answer, self.gateway_client, self.target_id, self.name)
          File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/utils.py", line 63, in deco
            return f(*a, **kw)
          File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 316, in get_return_value
            format(target_id, ".", name, value))
        Py4JError: An error occurred while calling o121.json. Trace:
        py4j.Py4JException: Method json([class java.util.ArrayList]) does not exist
        	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
        	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
        	at py4j.Gateway.invoke(Gateway.java:272)
        	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
        	at py4j.commands.CallCommand.execute(CallCommand.java:79)
        	at py4j.GatewayConnection.run(GatewayConnection.java:211)
        	at java.lang.Thread.run(Thread.java:744)
    ```
    
    - Reduced code duplication between DataStreamReader and DataFrameWriter
    - Added missing Python doctests
    
    ## How was this patch tested?
    New tests
    
    Author: Tathagata Das <tathagata.das1565@gmail.com>
    
    Closes #13703 from tdas/SPARK-15981.
    084dca77
    History
    [SPARK-15981][SQL][STREAMING] Fixed bug and added tests in DataStreamReader Python API
    Tathagata Das authored
    ## What changes were proposed in this pull request?
    
    - Fixed bug in Python API of DataStreamReader.  Because a single path was being converted to a array before calling Java DataStreamReader method (which takes a string only), it gave the following error.
    ```
    File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 947, in pyspark.sql.readwriter.DataStreamReader.json
    Failed example:
        json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'),                 schema = sdf_schema)
    Exception raised:
        Traceback (most recent call last):
          File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/doctest.py", line 1253, in __run
            compileflags, 1) in test.globs
          File "<doctest pyspark.sql.readwriter.DataStreamReader.json[0]>", line 1, in <module>
            json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'),                 schema = sdf_schema)
          File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/readwriter.py", line 963, in json
            return self._df(self._jreader.json(path))
          File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
            answer, self.gateway_client, self.target_id, self.name)
          File "/Users/tdas/Projects/Spark/spark/python/pyspark/sql/utils.py", line 63, in deco
            return f(*a, **kw)
          File "/Users/tdas/Projects/Spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 316, in get_return_value
            format(target_id, ".", name, value))
        Py4JError: An error occurred while calling o121.json. Trace:
        py4j.Py4JException: Method json([class java.util.ArrayList]) does not exist
        	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
        	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
        	at py4j.Gateway.invoke(Gateway.java:272)
        	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
        	at py4j.commands.CallCommand.execute(CallCommand.java:79)
        	at py4j.GatewayConnection.run(GatewayConnection.java:211)
        	at java.lang.Thread.run(Thread.java:744)
    ```
    
    - Reduced code duplication between DataStreamReader and DataFrameWriter
    - Added missing Python doctests
    
    ## How was this patch tested?
    New tests
    
    Author: Tathagata Das <tathagata.das1565@gmail.com>
    
    Closes #13703 from tdas/SPARK-15981.