Skip to content
Snippets Groups Projects
  1. May 07, 2017
    • zero323's avatar
      [SPARK-16931][PYTHON][SQL] Add Python wrapper for bucketBy · f53a8207
      zero323 authored
      ## What changes were proposed in this pull request?
      
      Adds Python wrappers for `DataFrameWriter.bucketBy` and `DataFrameWriter.sortBy` ([SPARK-16931](https://issues.apache.org/jira/browse/SPARK-16931))
      
      ## How was this patch tested?
      
      Unit tests covering new feature.
      
      __Note__: Based on work of GregBowyer (f49b9a23468f7af32cb53d2b654272757c151725)
      
      CC HyukjinKwon
      
      Author: zero323 <zero323@users.noreply.github.com>
      Author: Greg Bowyer <gbowyer@fastmail.co.uk>
      
      Closes #17077 from zero323/SPARK-16931.
      f53a8207
    • zero323's avatar
      [SPARK-18777][PYTHON][SQL] Return UDF from udf.register · 63d90e7d
      zero323 authored
      ## What changes were proposed in this pull request?
      
      - Move udf wrapping code from `functions.udf` to `functions.UserDefinedFunction`.
      - Return wrapped udf from `catalog.registerFunction` and dependent methods.
      - Update docstrings in `catalog.registerFunction` and `SQLContext.registerFunction`.
      - Unit tests.
      
      ## How was this patch tested?
      
      - Existing unit tests and docstests.
      - Additional tests covering new feature.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #17831 from zero323/SPARK-18777.
      63d90e7d
  2. May 03, 2017
    • zero323's avatar
      [SPARK-20584][PYSPARK][SQL] Python generic hint support · 02bbe731
      zero323 authored
      ## What changes were proposed in this pull request?
      
      Adds `hint` method to PySpark `DataFrame`.
      
      ## How was this patch tested?
      
      Unit tests, doctests.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #17850 from zero323/SPARK-20584.
      02bbe731
  3. May 01, 2017
    • zero323's avatar
      [SPARK-20290][MINOR][PYTHON][SQL] Add PySpark wrapper for eqNullSafe · f0169a1c
      zero323 authored
      ## What changes were proposed in this pull request?
      
      Adds Python bindings for `Column.eqNullSafe`
      
      ## How was this patch tested?
      
      Manual tests, existing unit tests, doc build.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #17605 from zero323/SPARK-20290.
      f0169a1c
  4. Apr 30, 2017
  5. Apr 05, 2017
    • zero323's avatar
      [SPARK-19454][PYTHON][SQL] DataFrame.replace improvements · e2773996
      zero323 authored
      ## What changes were proposed in this pull request?
      
      - Allows skipping `value` argument if `to_replace` is a `dict`:
      	```python
      	df = sc.parallelize([("Alice", 1, 3.0)]).toDF()
      	df.replace({"Alice": "Bob"}).show()
      	````
      - Adds validation step to ensure homogeneous values / replacements.
      - Simplifies internal control flow.
      - Improves unit tests coverage.
      
      ## How was this patch tested?
      
      Existing unit tests, additional unit tests, manual testing.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #16793 from zero323/SPARK-19454.
      e2773996
  6. Mar 26, 2017
  7. Mar 23, 2017
    • Tyson Condie's avatar
      [SPARK-19876][SS][WIP] OneTime Trigger Executor · 746a558d
      Tyson Condie authored
      ## What changes were proposed in this pull request?
      
      An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers.
      
      In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature.
      
      ## How was this patch tested?
      
      A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly.
      
      In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests:
      - The semantics of OneTime trigger (i.e., on start, execute a single batch, then stop).
      - The case when the commit log was not able to successfully log the completion of a batch before restart, which would mean that we should fall back to what's in the offset log.
      - A OneTime trigger execution that results in an exception being thrown.
      
      marmbrus tdas zsxwing
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Tyson Condie <tcondie@gmail.com>
      Author: Tathagata Das <tathagata.das1565@gmail.com>
      
      Closes #17219 from tcondie/stream-commit.
      746a558d
    • hyukjinkwon's avatar
      [SPARK-18579][SQL] Use ignoreLeadingWhiteSpace and ignoreTrailingWhiteSpace options in CSV writing · 07c12c09
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      This PR proposes to support _not_ trimming the white spaces when writing out. These are `false` by default in CSV reading path but these are `true` by default in CSV writing in univocity parser.
      
      Both `ignoreLeadingWhiteSpace` and `ignoreTrailingWhiteSpace` options are not being used for writing and therefore, we are always trimming the white spaces.
      
      It seems we should provide a way to keep this white spaces easily.
      
      WIth the data below:
      
      ```scala
      val df = spark.read.csv(Seq("a , b  , c").toDS)
      df.show()
      ```
      
      ```
      +---+----+---+
      |_c0| _c1|_c2|
      +---+----+---+
      | a | b  |  c|
      +---+----+---+
      ```
      
      **Before**
      
      ```scala
      df.write.csv("/tmp/text.csv")
      spark.read.text("/tmp/text.csv").show()
      ```
      
      ```
      +-----+
      |value|
      +-----+
      |a,b,c|
      +-----+
      ```
      
      It seems this can't be worked around via `quoteAll` too.
      
      ```scala
      df.write.option("quoteAll", true).csv("/tmp/text.csv")
      spark.read.text("/tmp/text.csv").show()
      ```
      ```
      +-----------+
      |      value|
      +-----------+
      |"a","b","c"|
      +-----------+
      ```
      
      **After**
      
      ```scala
      df.write.option("ignoreLeadingWhiteSpace", false).option("ignoreTrailingWhiteSpace", false).csv("/tmp/text.csv")
      spark.read.text("/tmp/text.csv").show()
      ```
      
      ```
      +----------+
      |     value|
      +----------+
      |a , b  , c|
      +----------+
      ```
      
      Note that this case is possible in R
      
      ```r
      > system("cat text.csv")
      f1,f2,f3
      a , b  , c
      > df <- read.csv(file="text.csv")
      > df
        f1   f2 f3
      1 a   b    c
      > write.csv(df, file="text1.csv", quote=F, row.names=F)
      > system("cat text1.csv")
      f1,f2,f3
      a , b  , c
      ```
      
      ## How was this patch tested?
      
      Unit tests in `CSVSuite` and manual tests for Python.
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #17310 from HyukjinKwon/SPARK-18579.
      07c12c09
  8. Mar 09, 2017
    • Jeff Zhang's avatar
      [SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc... · cabe1df8
      Jeff Zhang authored
      [SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc file in DataFrameReader.orc
      
      Beside the issue in spark api, also fix 2 minor issues in pyspark
      - support read from multiple input paths for orc
      - support read from multiple input paths for text
      
      Author: Jeff Zhang <zjffdu@apache.org>
      
      Closes #10307 from zjffdu/SPARK-12334.
      cabe1df8
    • Jason White's avatar
      [SPARK-19561][SQL] add int case handling for TimestampType · 206030bd
      Jason White authored
      ## What changes were proposed in this pull request?
      
      Add handling of input of type `Int` for dataType `TimestampType` to `EvaluatePython.scala`. Py4J serializes ints smaller than MIN_INT or larger than MAX_INT to Long, which are handled correctly already, but values between MIN_INT and MAX_INT are serialized to Int.
      
      These range limits correspond to roughly half an hour on either side of the epoch. As a result, PySpark doesn't allow TimestampType values to be created in this range.
      
      Alternatives attempted: patching the `TimestampType.toInternal` function to cast return values to `long`, so Py4J would always serialize them to Scala Long. Python3 does not have a `long` type, so this approach failed on Python3.
      
      ## How was this patch tested?
      
      Added a new PySpark-side test that fails without the change.
      
      The contribution is my original work and I license the work to the project under the project’s open source license.
      
      Resubmission of https://github.com/apache/spark/pull/16896. The original PR didn't go through Jenkins and broke the build. davies dongjoon-hyun
      
      cloud-fan Could you kick off a Jenkins run for me? It passed everything for me locally, but it's possible something has changed in the last few weeks.
      
      Author: Jason White <jason.white@shopify.com>
      
      Closes #17200 from JasonMWhite/SPARK-19561.
      206030bd
  9. Mar 07, 2017
  10. Mar 05, 2017
    • hyukjinkwon's avatar
      [SPARK-19701][SQL][PYTHON] Throws a correct exception for 'in' operator against column · 224e0e78
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      This PR proposes to remove incorrect implementation that has been not executed so far (at least from Spark 1.5.2) for `in` operator and throw a correct exception rather than saying it is a bool. I tested the codes above in 1.5.2, 1.6.3, 2.1.0 and in the master branch as below:
      
      **1.5.2**
      
      ```python
      >>> df = sqlContext.createDataFrame([[1]])
      >>> 1 in df._1
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File ".../spark-1.5.2-bin-hadoop2.6/python/pyspark/sql/column.py", line 418, in __nonzero__
          raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
      ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
      ```
      
      **1.6.3**
      
      ```python
      >>> 1 in sqlContext.range(1).id
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File ".../spark-1.6.3-bin-hadoop2.6/python/pyspark/sql/column.py", line 447, in __nonzero__
          raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
      ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
      ```
      
      **2.1.0**
      
      ```python
      >>> 1 in spark.range(1).id
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File ".../spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.py", line 426, in __nonzero__
          raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
      ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
      ```
      
      **Current Master**
      
      ```python
      >>> 1 in spark.range(1).id
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File ".../spark/python/pyspark/sql/column.py", line 452, in __nonzero__
          raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
      ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
      ```
      
      **After**
      
      ```python
      >>> 1 in spark.range(1).id
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File ".../spark/python/pyspark/sql/column.py", line 184, in __contains__
          raise ValueError("Cannot apply 'in' operator against a column: please use 'contains' "
      ValueError: Cannot apply 'in' operator against a column: please use 'contains' in a string column or 'array_contains' function for an array column.
      ```
      
      In more details,
      
      It seems the implementation intended to support this
      
      ```python
      1 in df.column
      ```
      
      However, currently, it throws an exception as below:
      
      ```python
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File ".../spark/python/pyspark/sql/column.py", line 426, in __nonzero__
          raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
      ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
      ```
      
      What happens here is as below:
      
      ```python
      class Column(object):
          def __contains__(self, item):
              print "I am contains"
              return Column()
          def __nonzero__(self):
              raise Exception("I am nonzero.")
      
      >>> 1 in Column()
      I am contains
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File "<stdin>", line 6, in __nonzero__
      Exception: I am nonzero.
      ```
      
      It seems it calls `__contains__` first and then `__nonzero__` or `__bool__` is being called against `Column()` to make this a bool (or int to be specific).
      
      It seems `__nonzero__` (for Python 2), `__bool__` (for Python 3) and `__contains__` forcing the the return into a bool unlike other operators. There are few references about this as below:
      
      https://bugs.python.org/issue16011
      http://stackoverflow.com/questions/12244074/python-source-code-for-built-in-in-operator/12244378#12244378
      http://stackoverflow.com/questions/38542543/functionality-of-python-in-vs-contains/38542777
      
      It seems we can't overwrite `__nonzero__` or `__bool__` as a workaround to make this working because these force the return type as a bool as below:
      
      ```python
      class Column(object):
          def __contains__(self, item):
              print "I am contains"
              return Column()
          def __nonzero__(self):
              return "a"
      
      >>> 1 in Column()
      I am contains
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
      TypeError: __nonzero__ should return bool or int, returned str
      ```
      
      ## How was this patch tested?
      
      Added unit tests in `tests.py`.
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #17160 from HyukjinKwon/SPARK-19701.
      224e0e78
  11. Feb 28, 2017
    • hyukjinkwon's avatar
      [SPARK-19610][SQL] Support parsing multiline CSV files · 7e5359be
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      This PR proposes the support for multiple lines for CSV by resembling the multiline supports in JSON datasource (in case of JSON, per file).
      
      So, this PR introduces `wholeFile` option which makes the format not splittable and reads each whole file. Since Univocity parser can produces each row from a stream, it should be capable of parsing very large documents when the internal rows are fix in the memory.
      
      ## How was this patch tested?
      
      Unit tests in `CSVSuite` and `tests.py`
      
      Manual tests with a single 9GB CSV file in local file system, for example,
      
      ```scala
      spark.read.option("wholeFile", true).option("inferSchema", true).csv("tmp.csv").count()
      ```
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #16976 from HyukjinKwon/SPARK-19610.
      7e5359be
  12. Feb 24, 2017
    • zero323's avatar
      [SPARK-19161][PYTHON][SQL] Improving UDF Docstrings · 4a5e38f5
      zero323 authored
      ## What changes were proposed in this pull request?
      
      Replaces `UserDefinedFunction` object returned from `udf` with a function wrapper providing docstring and arguments information as proposed in [SPARK-19161](https://issues.apache.org/jira/browse/SPARK-19161).
      
      ### Backward incompatible changes:
      
      - `pyspark.sql.functions.udf` will return a `function` instead of `UserDefinedFunction`. To ensure backward compatible public API we use function attributes to mimic  `UserDefinedFunction` API (`func` and `returnType` attributes).  This should have a minimal impact on the user code.
      
        An alternative implementation could use dynamical sub-classing. This would ensure full backward compatibility but is more fragile in practice.
      
      ### Limitations:
      
      Full functionality (retained docstring and argument list) is achieved only in the recent Python version. Legacy Python version will preserve only docstrings, but not argument list. This should be an acceptable trade-off between achieved improvements and overall complexity.
      
      ### Possible impact on other tickets:
      
      This can affect [SPARK-18777](https://issues.apache.org/jira/browse/SPARK-18777).
      
      ## How was this patch tested?
      
      Existing unit tests to ensure backward compatibility, additional tests targeting proposed changes.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #16534 from zero323/SPARK-19161.
      4a5e38f5
  13. Feb 23, 2017
    • Wenchen Fan's avatar
      [SPARK-19706][PYSPARK] add Column.contains in pyspark · 4fa4cf1d
      Wenchen Fan authored
      ## What changes were proposed in this pull request?
      
      to be consistent with the scala API, we should also add `contains` to `Column` in pyspark.
      
      ## How was this patch tested?
      
      updated unit test
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #17036 from cloud-fan/pyspark.
      4fa4cf1d
  14. Feb 16, 2017
    • Nathan Howell's avatar
      [SPARK-18352][SQL] Support parsing multiline json files · 21fde57f
      Nathan Howell authored
      ## What changes were proposed in this pull request?
      
      If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory.
      
      Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired.
      
      These changes have allowed types other than `String` to be parsed. Support for `UTF8String` and `Text` have been added (alongside `String` and `InputFormat`) and no longer require a conversion to `String` just for parsing.
      
      I've also included a few other changes that generate slightly better bytecode and (imo) make it more obvious when and where boxing is occurring in the parser. These are included as separate commits, let me know if they should be flattened into this PR or moved to a new one.
      
      ## How was this patch tested?
      
      New and existing unit tests. No performance or load tests have been run.
      
      Author: Nathan Howell <nhowell@godaddy.com>
      
      Closes #16386 from NathanHowell/SPARK-18352.
      21fde57f
  15. Feb 15, 2017
    • zero323's avatar
      [SPARK-19160][PYTHON][SQL] Add udf decorator · c97f4e17
      zero323 authored
      ## What changes were proposed in this pull request?
      
      This PR adds `udf` decorator syntax as proposed in [SPARK-19160](https://issues.apache.org/jira/browse/SPARK-19160).
      
      This allows users to define UDF using simplified syntax:
      
      ```python
      from pyspark.sql.decorators import udf
      
      udf(IntegerType())
      def add_one(x):
          """Adds one"""
          if x is not None:
              return x + 1
       ```
      
      without need to define a separate function and udf.
      
      ## How was this patch tested?
      
      Existing unit tests to ensure backward compatibility and additional unit tests covering new functionality.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #16533 from zero323/SPARK-19160.
      c97f4e17
  16. Feb 14, 2017
    • Sheamus K. Parkes's avatar
      [SPARK-18541][PYTHON] Add metadata parameter to pyspark.sql.Column.alias() · 7b64f7aa
      Sheamus K. Parkes authored
      ## What changes were proposed in this pull request?
      
      Add a `metadata` keyword parameter to `pyspark.sql.Column.alias()` to allow users to mix-in metadata while manipulating `DataFrame`s in `pyspark`.  Without this, I believe it was necessary to pass back through `SparkSession.createDataFrame` each time a user wanted to manipulate `StructField.metadata` in `pyspark`.
      
      This pull request also improves consistency between the Scala and Python APIs (i.e. I did not add any functionality that was not already in the Scala API).
      
      Discussed ahead of time on JIRA with marmbrus
      
      ## How was this patch tested?
      
      Added unit tests (and doc tests).  Ran the pertinent tests manually.
      
      Author: Sheamus K. Parkes <shea.parkes@milliman.com>
      
      Closes #16094 from shea-parkes/pyspark-column-alias-metadata.
      7b64f7aa
    • zero323's avatar
      [SPARK-19162][PYTHON][SQL] UserDefinedFunction should validate that func is callable · e0eeb0f8
      zero323 authored
      ## What changes were proposed in this pull request?
      
      UDF constructor checks if `func` argument is callable and if it is not, fails fast instead of waiting for an action.
      
      ## How was this patch tested?
      
      Unit tests.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #16535 from zero323/SPARK-19162.
      e0eeb0f8
  17. Feb 13, 2017
    • zero323's avatar
      [SPARK-19429][PYTHON][SQL] Support slice arguments in Column.__getitem__ · e02ac303
      zero323 authored
      ## What changes were proposed in this pull request?
      
      - Add support for `slice` arguments in `Column.__getitem__`.
      - Remove obsolete `__getslice__` bindings.
      
      ## How was this patch tested?
      
      Existing unit tests, additional tests covering `[]` with `slice`.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #16771 from zero323/SPARK-19429.
      e02ac303
    • zero323's avatar
      [SPARK-19427][PYTHON][SQL] Support data type string as a returnType argument of UDF · ab88b241
      zero323 authored
      ## What changes were proposed in this pull request?
      
      Add support for data type string as a return type argument of `UserDefinedFunction`:
      
      ```python
      f = udf(lambda x: x, "integer")
       f.returnType
      
      ## IntegerType
      ```
      
      ## How was this patch tested?
      
      Existing unit tests, additional unit tests covering new feature.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #16769 from zero323/SPARK-19427.
      ab88b241
  18. Feb 07, 2017
    • anabranch's avatar
      [SPARK-16609] Add to_date/to_timestamp with format functions · 7a7ce272
      anabranch authored
      ## What changes were proposed in this pull request?
      
      This pull request adds two new user facing functions:
      - `to_date` which accepts an expression and a format and returns a date.
      - `to_timestamp` which accepts an expression and a format and returns a timestamp.
      
      For example, Given a date in format: `2016-21-05`. (YYYY-dd-MM)
      
      ### Date Function
      *Previously*
      ```
      to_date(unix_timestamp(lit("2016-21-05"), "yyyy-dd-MM").cast("timestamp"))
      ```
      *Current*
      ```
      to_date(lit("2016-21-05"), "yyyy-dd-MM")
      ```
      
      ### Timestamp Function
      *Previously*
      ```
      unix_timestamp(lit("2016-21-05"), "yyyy-dd-MM").cast("timestamp")
      ```
      *Current*
      ```
      to_timestamp(lit("2016-21-05"), "yyyy-dd-MM")
      ```
      ### Tasks
      
      - [X] Add `to_date` to Scala Functions
      - [x] Add `to_date` to Python Functions
      - [x] Add `to_date` to SQL Functions
      - [X] Add `to_timestamp` to Scala Functions
      - [x] Add `to_timestamp` to Python Functions
      - [x] Add `to_timestamp` to SQL Functions
      - [x] Add function to R
      
      ## How was this patch tested?
      
      - [x] Add Functions to `DateFunctionsSuite`
      - Test new `ParseToTimestamp` Expression (*not necessary*)
      - Test new `ParseToDate` Expression (*not necessary*)
      - [x] Add test for R
      - [x] Add test for Python in test.py
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: anabranch <wac.chambers@gmail.com>
      Author: Bill Chambers <bill@databricks.com>
      Author: anabranch <bill@databricks.com>
      
      Closes #16138 from anabranch/SPARK-16609.
      7a7ce272
  19. Feb 01, 2017
    • Zheng RuiFeng's avatar
      [SPARK-14352][SQL] approxQuantile should support multi columns · b0985764
      Zheng RuiFeng authored
      ## What changes were proposed in this pull request?
      
      1, add the multi-cols support based on current private api
      2, add the multi-cols support to pyspark
      ## How was this patch tested?
      
      unit tests
      
      Author: Zheng RuiFeng <ruifengz@foxmail.com>
      Author: Ruifeng Zheng <ruifengz@foxmail.com>
      
      Closes #12135 from zhengruifeng/quantile4multicols.
      b0985764
  20. Jan 31, 2017
  21. Jan 22, 2017
  22. Jan 20, 2017
    • Davies Liu's avatar
      [SPARK-18589][SQL] Fix Python UDF accessing attributes from both side of join · 9b7a03f1
      Davies Liu authored
      ## What changes were proposed in this pull request?
      
      PythonUDF is unevaluable, which can not be used inside a join condition, currently the optimizer will push a PythonUDF which accessing both side of join into the join condition, then the query will fail to plan.
      
      This PR fix this issue by checking the expression is evaluable  or not before pushing it into Join.
      
      ## How was this patch tested?
      
      Add a regression test.
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #16581 from davies/pyudf_join.
      9b7a03f1
  23. Jan 18, 2017
    • Liang-Chi Hsieh's avatar
      [SPARK-19223][SQL][PYSPARK] Fix InputFileBlockHolder for datasources which are... · d06172b8
      Liang-Chi Hsieh authored
      [SPARK-19223][SQL][PYSPARK] Fix InputFileBlockHolder for datasources which are based on HadoopRDD or NewHadoopRDD
      
      ## What changes were proposed in this pull request?
      
      For some datasources which are based on HadoopRDD or NewHadoopRDD, such as spark-xml, InputFileBlockHolder doesn't work with Python UDF.
      
      The method to reproduce it is, running the following codes with `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`:
      
          from pyspark.sql.functions import udf,input_file_name
          from pyspark.sql.types import StringType
          from pyspark.sql import SparkSession
      
          def filename(path):
              return path
      
          session = SparkSession.builder.appName('APP').getOrCreate()
      
          session.udf.register('sameText', filename)
          sameText = udf(filename, StringType())
      
          df = session.read.format('xml').load('a.xml', rowTag='root').select('*', input_file_name().alias('file'))
          df.select('file').show() # works
          df.select(sameText(df['file'])).show()   # returns empty content
      
      The issue is because in `HadoopRDD` and `NewHadoopRDD` we set the file block's info in `InputFileBlockHolder` before the returned iterator begins consuming. `InputFileBlockHolder` will record this info into thread local variable. When running Python UDF in batch, we set up another thread to consume the iterator from child plan's output rdd, so we can't read the info back in another thread.
      
      To fix this, we have to set the info in `InputFileBlockHolder` after the iterator begins consuming. So the info can be read in correct thread.
      
      ## How was this patch tested?
      
      Manual test with above example codes for spark-xml package on pyspark: `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`.
      
      Added pyspark test.
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Liang-Chi Hsieh <viirya@gmail.com>
      
      Closes #16585 from viirya/fix-inputfileblock-hadooprdd.
      d06172b8
  24. Jan 13, 2017
    • Vinayak's avatar
      [SPARK-18687][PYSPARK][SQL] Backward compatibility - creating a Dataframe on a... · 285a7798
      Vinayak authored
      [SPARK-18687][PYSPARK][SQL] Backward compatibility - creating a Dataframe on a new SQLContext object fails with a Derby error
      
      Change is for SQLContext to reuse the active SparkSession during construction if the sparkContext supplied is the same as the currently active SparkContext. Without this change, a new SparkSession is instantiated that results in a Derby error when attempting to create a dataframe using a new SQLContext object even though the SparkContext supplied to the new SQLContext is same as the currently active one. Refer https://issues.apache.org/jira/browse/SPARK-18687 for details on the error and a repro.
      
      Existing unit tests and a new unit test added to pyspark-sql:
      
      /python/run-tests --python-executables=python --modules=pyspark-sql
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Vinayak <vijoshi5@in.ibm.com>
      Author: Vinayak Joshi <vijoshi@users.noreply.github.com>
      
      Closes #16119 from vijoshi/SPARK-18687_master.
      285a7798
  25. Jan 12, 2017
    • Liang-Chi Hsieh's avatar
      [SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext is stopped · c6c37b8a
      Liang-Chi Hsieh authored
      ## What changes were proposed in this pull request?
      
      In SparkSession initialization, we store created the instance of SparkSession into a class variable _instantiatedContext. Next time we can use SparkSession.builder.getOrCreate() to retrieve the existing SparkSession instance.
      
      However, when the active SparkContext is stopped and we create another new SparkContext to use, the existing SparkSession is still associated with the stopped SparkContext. So the operations with this existing SparkSession will be failed.
      
      We need to detect such case in SparkSession and renew the class variable _instantiatedContext if needed.
      
      ## How was this patch tested?
      
      New test added in PySpark.
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Liang-Chi Hsieh <viirya@gmail.com>
      
      Closes #16454 from viirya/fix-pyspark-sparksession.
      c6c37b8a
  26. Dec 15, 2016
  27. Dec 14, 2016
    • Shixiong Zhu's avatar
      [SPARK-18852][SS] StreamingQuery.lastProgress should be null when recentProgress is empty · 1ac6567b
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's hard to be used in Python since Python user will just see Py4jError.
      
      This PR just makes it return null instead.
      
      ## How was this patch tested?
      
      `test("lastProgress should be null when recentProgress is empty")`
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #16273 from zsxwing/SPARK-18852.
      1ac6567b
  28. Dec 10, 2016
    • gatorsmile's avatar
      [SPARK-18766][SQL] Push Down Filter Through BatchEvalPython (Python UDF) · 422a45cf
      gatorsmile authored
      ### What changes were proposed in this pull request?
      Currently, when users use Python UDF in Filter, BatchEvalPython is always generated below FilterExec. However, not all the predicates need to be evaluated after Python UDF execution. Thus, this PR is to push down the determinisitc predicates through `BatchEvalPython`.
      ```Python
      >>> df = spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
      >>> from pyspark.sql.functions import udf, col
      >>> from pyspark.sql.types import BooleanType
      >>> my_filter = udf(lambda a: a < 2, BooleanType())
      >>> sel = df.select(col("key"), col("value")).filter((my_filter(col("key"))) & (df.value < "2"))
      >>> sel.explain(True)
      ```
      Before the fix, the plan looks like
      ```
      == Optimized Logical Plan ==
      Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
      +- LogicalRDD [key#0L, value#1]
      
      == Physical Plan ==
      *Project [key#0L, value#1]
      +- *Filter ((isnotnull(value#1) && pythonUDF0#9) && (value#1 < 2))
         +- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
            +- Scan ExistingRDD[key#0L,value#1]
      ```
      
      After the fix, the plan looks like
      ```
      == Optimized Logical Plan ==
      Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
      +- LogicalRDD [key#0L, value#1]
      
      == Physical Plan ==
      *Project [key#0L, value#1]
      +- *Filter pythonUDF0#9: boolean
         +- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
            +- *Filter (isnotnull(value#1) && (value#1 < 2))
               +- Scan ExistingRDD[key#0L,value#1]
      ```
      
      ### How was this patch tested?
      Added both unit test cases for `BatchEvalPythonExec` and also add an end-to-end test case in Python test suite.
      
      Author: gatorsmile <gatorsmile@gmail.com>
      
      Closes #16193 from gatorsmile/pythonUDFPredicatePushDown.
      422a45cf
  29. Dec 08, 2016
    • Liang-Chi Hsieh's avatar
      [SPARK-18667][PYSPARK][SQL] Change the way to group row in BatchEvalPythonExec... · 6a5a7254
      Liang-Chi Hsieh authored
      [SPARK-18667][PYSPARK][SQL] Change the way to group row in BatchEvalPythonExec so input_file_name function can work with UDF in pyspark
      
      ## What changes were proposed in this pull request?
      
      `input_file_name` doesn't return filename when working with UDF in PySpark. An example shows the problem:
      
          from pyspark.sql.functions import *
          from pyspark.sql.types import *
      
          def filename(path):
              return path
      
          sourceFile = udf(filename, StringType())
          spark.read.json("tmp.json").select(sourceFile(input_file_name())).show()
      
          +---------------------------+
          |filename(input_file_name())|
          +---------------------------+
          |                           |
          +---------------------------+
      
      The cause of this issue is, we group rows in `BatchEvalPythonExec` for batching processing of PythonUDF. Currently we group rows first and then evaluate expressions on the rows. If the data is less than the required number of rows for a group, the iterator will be consumed to the end before the evaluation. However, once the iterator reaches the end, we will unset input filename. So the input_file_name expression can't return correct filename.
      
      This patch fixes the approach to group the batch of rows. We evaluate the expression first and then group evaluated results to batch.
      
      ## How was this patch tested?
      
      Added unit test to PySpark.
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Liang-Chi Hsieh <viirya@gmail.com>
      
      Closes #16115 from viirya/fix-py-udf-input-filename.
      6a5a7254
  30. Dec 07, 2016
  31. Dec 05, 2016
    • Liang-Chi Hsieh's avatar
      [SPARK-18634][PYSPARK][SQL] Corruption and Correctness issues with exploding Python UDFs · 3ba69b64
      Liang-Chi Hsieh authored
      ## What changes were proposed in this pull request?
      
      As reported in the Jira, there are some weird issues with exploding Python UDFs in SparkSQL.
      
      The following test code can reproduce it. Notice: the following test code is reported to return wrong results in the Jira. However, as I tested on master branch, it causes exception and so can't return any result.
      
          >>> from pyspark.sql.functions import *
          >>> from pyspark.sql.types import *
          >>>
          >>> df = spark.range(10)
          >>>
          >>> def return_range(value):
          ...   return [(i, str(i)) for i in range(value - 1, value + 1)]
          ...
          >>> range_udf = udf(return_range, ArrayType(StructType([StructField("integer_val", IntegerType()),
          ...                                                     StructField("string_val", StringType())])))
          >>>
          >>> df.select("id", explode(range_udf(df.id))).show()
          Traceback (most recent call last):
            File "<stdin>", line 1, in <module>
            File "/spark/python/pyspark/sql/dataframe.py", line 318, in show
              print(self._jdf.showString(n, 20))
            File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
            File "/spark/python/pyspark/sql/utils.py", line 63, in deco
              return f(*a, **kw)
            File "/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o126.showString.: java.lang.AssertionError: assertion failed
              at scala.Predef$.assert(Predef.scala:156)
              at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:120)
              at org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:57)
      
      The cause of this issue is, in `ExtractPythonUDFs` we insert `BatchEvalPythonExec` to run PythonUDFs in batch. `BatchEvalPythonExec` will add extra outputs (e.g., `pythonUDF0`) to original plan. In above case, the original `Range` only has one output `id`. After `ExtractPythonUDFs`, the added `BatchEvalPythonExec` has two outputs `id` and `pythonUDF0`.
      
      Because the output of `GenerateExec` is given after analysis phase, in above case, it is the combination of `id`, i.e., the output of `Range`, and `col`. But in planning phase, we change `GenerateExec`'s child plan to `BatchEvalPythonExec` with additional output attributes.
      
      It will cause no problem in non wholestage codegen. Because when evaluating the additional attributes are projected out the final output of `GenerateExec`.
      
      However, as `GenerateExec` now supports wholestage codegen, the framework will input all the outputs of the child plan to `GenerateExec`. Then when consuming `GenerateExec`'s output data (i.e., calling `consume`), the number of output attributes is different to the output variables in wholestage codegen.
      
      To solve this issue, this patch only gives the generator's output to `GenerateExec` after analysis phase. `GenerateExec`'s output is the combination of its child plan's output and the generator's output. So when we change `GenerateExec`'s child, its output is still correct.
      
      ## How was this patch tested?
      
      Added test cases to PySpark.
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Liang-Chi Hsieh <viirya@gmail.com>
      
      Closes #16120 from viirya/fix-py-udf-with-generator.
      3ba69b64
    • Shixiong Zhu's avatar
      [SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix... · 24601285
      Shixiong Zhu authored
      [SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
      
      ## What changes were proposed in this pull request?
      
      - Add StreamingQuery.explain and exception to Python.
      - Fix StreamingQueryException to not expose `OffsetSeq`.
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #16125 from zsxwing/py-streaming-explain.
      24601285
  32. Dec 02, 2016
  33. Nov 30, 2016
    • Tathagata Das's avatar
      [SPARK-18516][STRUCTURED STREAMING] Follow up PR to add StreamingQuery.status to Python · bc09a2b8
      Tathagata Das authored
      ## What changes were proposed in this pull request?
      - Add StreamingQueryStatus.json
      - Make it not case class (to avoid unnecessarily exposing implicit object StreamingQueryStatus, consistent with StreamingQueryProgress)
      - Add StreamingQuery.status to Python
      - Fix post-termination status
      
      ## How was this patch tested?
      New unit tests
      
      Author: Tathagata Das <tathagata.das1565@gmail.com>
      
      Closes #16075 from tdas/SPARK-18516-1.
      bc09a2b8
Loading