Skip to content
Snippets Groups Projects
  1. Aug 25, 2016
  2. Aug 16, 2016
    • Dongjoon Hyun's avatar
      [SPARK-17035] [SQL] [PYSPARK] Improve Timestamp not to lose precision for all cases · 12a89e55
      Dongjoon Hyun authored
      ## What changes were proposed in this pull request?
      
      `PySpark` loses `microsecond` precision for some corner cases during converting `Timestamp` into `Long`. For example, for the following `datetime.max` value should be converted a value whose last 6 digits are '999999'. This PR improves the logic not to lose precision for all cases.
      
      **Corner case**
      ```python
      >>> datetime.datetime.max
      datetime.datetime(9999, 12, 31, 23, 59, 59, 999999)
      ```
      
      **Before**
      ```python
      >>> from datetime import datetime
      >>> from pyspark.sql import Row
      >>> from pyspark.sql.types import StructType, StructField, TimestampType
      >>> schema = StructType([StructField("dt", TimestampType(), False)])
      >>> [schema.toInternal(row) for row in [{"dt": datetime.max}]]
      [(253402329600000000,)]
      ```
      
      **After**
      ```python
      >>> [schema.toInternal(row) for row in [{"dt": datetime.max}]]
      [(253402329599999999,)]
      ```
      
      ## How was this patch tested?
      
      Pass the Jenkins test with a new test case.
      
      Author: Dongjoon Hyun <dongjoon@apache.org>
      
      Closes #14631 from dongjoon-hyun/SPARK-17035.
      12a89e55
  3. Aug 15, 2016
    • Davies Liu's avatar
      [SPARK-16700][PYSPARK][SQL] create DataFrame from dict/Row with schema · fffb0c0d
      Davies Liu authored
      ## What changes were proposed in this pull request?
      
      In 2.0, we verify the data type against schema for every row for safety, but with performance cost, this PR make it optional.
      
      When we verify the data type for StructType, it does not support all the types we support in infer schema (for example, dict), this PR fix that to make them consistent.
      
      For Row object which is created using named arguments, the order of fields are sorted by name, they may be not different than the order in provided schema, this PR fix that by ignore the order of fields in this case.
      
      ## How was this patch tested?
      
      Created regression tests for them.
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #14469 from davies/py_dict.
      fffb0c0d
  4. Aug 02, 2016
    • Liang-Chi Hsieh's avatar
      [SPARK-16062] [SPARK-15989] [SQL] Fix two bugs of Python-only UDTs · 146001a9
      Liang-Chi Hsieh authored
      ## What changes were proposed in this pull request?
      
      There are two related bugs of Python-only UDTs. Because the test case of second one needs the first fix too. I put them into one PR. If it is not appropriate, please let me know.
      
      ### First bug: When MapObjects works on Python-only UDTs
      
      `RowEncoder` will use `PythonUserDefinedType.sqlType` for its deserializer expression. If the sql type is `ArrayType`, we will have `MapObjects` working on it. But `MapObjects` doesn't consider `PythonUserDefinedType` as its input data type. It causes error like:
      
          import pyspark.sql.group
          from pyspark.sql.tests import PythonOnlyPoint, PythonOnlyUDT
          from pyspark.sql.types import *
      
          schema = StructType().add("key", LongType()).add("val", PythonOnlyUDT())
          df = spark.createDataFrame([(i % 3, PythonOnlyPoint(float(i), float(i))) for i in range(10)], schema=schema)
          df.show()
      
          File "/home/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o36.showString.
          : java.lang.RuntimeException: Error while decoding: scala.MatchError: org.apache.spark.sql.types.PythonUserDefinedTypef4ceede8 (of class org.apache.spark.sql.types.PythonUserDefinedType)
          ...
      
      ### Second bug: When Python-only UDTs is the element type of ArrayType
      
          import pyspark.sql.group
          from pyspark.sql.tests import PythonOnlyPoint, PythonOnlyUDT
          from pyspark.sql.types import *
      
          schema = StructType().add("key", LongType()).add("val", ArrayType(PythonOnlyUDT()))
          df = spark.createDataFrame([(i % 3, [PythonOnlyPoint(float(i), float(i))]) for i in range(10)], schema=schema)
          df.show()
      
      ## How was this patch tested?
      PySpark's sql tests.
      
      Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
      
      Closes #13778 from viirya/fix-pyudt.
      146001a9
  5. Jun 28, 2016
    • Davies Liu's avatar
      [SPARK-16175] [PYSPARK] handle None for UDT · 35438fb0
      Davies Liu authored
      ## What changes were proposed in this pull request?
      
      Scala UDT will bypass all the null and will not pass them into serialize() and deserialize() of UDT, this PR update the Python UDT to do this as well.
      
      ## How was this patch tested?
      
      Added tests.
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #13878 from davies/udt_null.
      35438fb0
    • Yin Huai's avatar
      [SPARK-16224] [SQL] [PYSPARK] SparkSession builder's configs need to be set to... · 0923c4f5
      Yin Huai authored
      [SPARK-16224] [SQL] [PYSPARK] SparkSession builder's configs need to be set to the existing Scala SparkContext's SparkConf
      
      ## What changes were proposed in this pull request?
      When we create a SparkSession at the Python side, it is possible that a SparkContext has been created. For this case, we need to set configs of the SparkSession builder to the Scala SparkContext's SparkConf (we need to do so because conf changes on a active Python SparkContext will not be propagated to the JVM side). Otherwise, we may create a wrong SparkSession (e.g. Hive support is not enabled even if enableHiveSupport is called).
      
      ## How was this patch tested?
      New tests and manual tests.
      
      Author: Yin Huai <yhuai@databricks.com>
      
      Closes #13931 from yhuai/SPARK-16224.
      0923c4f5
  6. Jun 27, 2016
  7. Jun 24, 2016
    • Davies Liu's avatar
      [SPARK-16179][PYSPARK] fix bugs for Python udf in generate · 4435de1b
      Davies Liu authored
      ## What changes were proposed in this pull request?
      
      This PR fix the bug when Python UDF is used in explode (generator), GenerateExec requires that all the attributes in expressions should be resolvable from children when creating, we should replace the children first, then replace it's expressions.
      
      ```
      >>> df.select(explode(f(*df))).show()
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File "/home/vlad/dev/spark/python/pyspark/sql/dataframe.py", line 286, in show
          print(self._jdf.showString(n, truncate))
        File "/home/vlad/dev/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
        File "/home/vlad/dev/spark/python/pyspark/sql/utils.py", line 63, in deco
          return f(*a, **kw)
        File "/home/vlad/dev/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value
      py4j.protocol.Py4JJavaError: An error occurred while calling o52.showString.
      : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
      Generate explode(<lambda>(_1#0L)), false, false, [col#15L]
      +- Scan ExistingRDD[_1#0L]
      
      	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:387)
      	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:69)
      	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:45)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:177)
      	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:144)
      	at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:153)
      	at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
      	at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:113)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
      	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:298)
      	at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:113)
      	at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:93)
      	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:95)
      	at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:95)
      	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
      	at scala.collection.immutable.List.foldLeft(List.scala:84)
      	at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:95)
      	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:85)
      	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:85)
      	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2557)
      	at org.apache.spark.sql.Dataset.head(Dataset.scala:1923)
      	at org.apache.spark.sql.Dataset.take(Dataset.scala:2138)
      	at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
      	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      	at py4j.Gateway.invoke(Gateway.java:280)
      	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
      	at py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at py4j.GatewayConnection.run(GatewayConnection.java:211)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.reflect.InvocationTargetException
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1$$anonfun$apply$13.apply(TreeNode.scala:413)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1$$anonfun$apply$13.apply(TreeNode.scala:413)
      	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:412)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:387)
      	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
      	... 42 more
      Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#20
      	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
      	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:278)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:284)
      	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:284)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:268)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
      	at org.apache.spark.sql.execution.GenerateExec.<init>(GenerateExec.scala:63)
      	... 52 more
      Caused by: java.lang.RuntimeException: Couldn't find pythonUDF0#20 in [_1#0L]
      	at scala.sys.package$.error(package.scala:27)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
      	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
      	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
      	... 67 more
      ```
      
      ## How was this patch tested?
      
      Added regression tests.
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #13883 from davies/udf_in_generate.
      4435de1b
  8. Jun 21, 2016
  9. Jun 20, 2016
  10. Jun 15, 2016
    • Davies Liu's avatar
      [SPARK-15888] [SQL] fix Python UDF with aggregate · 5389013a
      Davies Liu authored
      ## What changes were proposed in this pull request?
      
      After we move the ExtractPythonUDF rule into physical plan, Python UDF can't work on top of aggregate anymore, because they can't be evaluated before aggregate, should be evaluated after aggregate. This PR add another rule to extract these kind of Python UDF from logical aggregate, create a Project on top of Aggregate.
      
      ## How was this patch tested?
      
      Added regression tests. The plan of added test query looks like this:
      ```
      == Parsed Logical Plan ==
      'Project [<lambda>('k, 's) AS t#26]
      +- Aggregate [<lambda>(key#5L)], [<lambda>(key#5L) AS k#17, sum(cast(<lambda>(value#6) as bigint)) AS s#22L]
         +- LogicalRDD [key#5L, value#6]
      
      == Analyzed Logical Plan ==
      t: int
      Project [<lambda>(k#17, s#22L) AS t#26]
      +- Aggregate [<lambda>(key#5L)], [<lambda>(key#5L) AS k#17, sum(cast(<lambda>(value#6) as bigint)) AS s#22L]
         +- LogicalRDD [key#5L, value#6]
      
      == Optimized Logical Plan ==
      Project [<lambda>(agg#29, agg#30L) AS t#26]
      +- Aggregate [<lambda>(key#5L)], [<lambda>(key#5L) AS agg#29, sum(cast(<lambda>(value#6) as bigint)) AS agg#30L]
         +- LogicalRDD [key#5L, value#6]
      
      == Physical Plan ==
      *Project [pythonUDF0#37 AS t#26]
      +- BatchEvalPython [<lambda>(agg#29, agg#30L)], [agg#29, agg#30L, pythonUDF0#37]
         +- *HashAggregate(key=[<lambda>(key#5L)#31], functions=[sum(cast(<lambda>(value#6) as bigint))], output=[agg#29,agg#30L])
            +- Exchange hashpartitioning(<lambda>(key#5L)#31, 200)
               +- *HashAggregate(key=[pythonUDF0#34 AS <lambda>(key#5L)#31], functions=[partial_sum(cast(pythonUDF1#35 as bigint))], output=[<lambda>(key#5L)#31,sum#33L])
                  +- BatchEvalPython [<lambda>(key#5L), <lambda>(value#6)], [key#5L, value#6, pythonUDF0#34, pythonUDF1#35]
                     +- Scan ExistingRDD[key#5L,value#6]
      ```
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #13682 from davies/fix_py_udf.
      5389013a
    • Tathagata Das's avatar
      [SPARK-15953][WIP][STREAMING] Renamed ContinuousQuery to StreamingQuery · 9a507199
      Tathagata Das authored
      Renamed for simplicity, so that its obvious that its related to streaming.
      
      Existing unit tests.
      
      Author: Tathagata Das <tathagata.das1565@gmail.com>
      
      Closes #13673 from tdas/SPARK-15953.
      9a507199
  11. Jun 14, 2016
    • Tathagata Das's avatar
      [SPARK-15933][SQL][STREAMING] Refactored DF reader-writer to use readStream... · 214adb14
      Tathagata Das authored
      [SPARK-15933][SQL][STREAMING] Refactored DF reader-writer to use readStream and writeStream for streaming DFs
      
      ## What changes were proposed in this pull request?
      Currently, the DataFrameReader/Writer has method that are needed for streaming and non-streaming DFs. This is quite awkward because each method in them through runtime exception for one case or the other. So rather having half the methods throw runtime exceptions, its just better to have a different reader/writer API for streams.
      
      - [x] Python API!!
      
      ## How was this patch tested?
      Existing unit tests + two sets of unit tests for DataFrameReader/Writer and DataStreamReader/Writer.
      
      Author: Tathagata Das <tathagata.das1565@gmail.com>
      
      Closes #13653 from tdas/SPARK-15933.
      214adb14
  12. Jun 13, 2016
    • Sandeep Singh's avatar
      [SPARK-15663][SQL] SparkSession.catalog.listFunctions shouldn't include the... · 1842cdd4
      Sandeep Singh authored
      [SPARK-15663][SQL] SparkSession.catalog.listFunctions shouldn't include the list of built-in functions
      
      ## What changes were proposed in this pull request?
      SparkSession.catalog.listFunctions currently returns all functions, including the list of built-in functions. This makes the method not as useful because anytime it is run the result set contains over 100 built-in functions.
      
      ## How was this patch tested?
      CatalogSuite
      
      Author: Sandeep Singh <sandeep@techaddict.me>
      
      Closes #13413 from techaddict/SPARK-15663.
      1842cdd4
  13. May 31, 2016
    • Tathagata Das's avatar
      [SPARK-15517][SQL][STREAMING] Add support for complete output mode in Structure Streaming · 90b11439
      Tathagata Das authored
      ## What changes were proposed in this pull request?
      Currently structured streaming only supports append output mode.  This PR adds the following.
      
      - Added support for Complete output mode in the internal state store, analyzer and planner.
      - Added public API in Scala and Python for users to specify output mode
      - Added checks for unsupported combinations of output mode and DF operations
        - Plans with no aggregation should support only Append mode
        - Plans with aggregation should support only Update and Complete modes
        - Default output mode is Append mode (**Question: should we change this to automatically set to Complete mode when there is aggregation?**)
      - Added support for Complete output mode in Memory Sink. So Memory Sink internally supports append and complete, update. But from public API only Complete and Append output modes are supported.
      
      ## How was this patch tested?
      Unit tests in various test suites
      - StreamingAggregationSuite: tests for complete mode
      - MemorySinkSuite: tests for checking behavior in Append and Complete modes.
      - UnsupportedOperationSuite: tests for checking unsupported combinations of DF ops and output modes
      - DataFrameReaderWriterSuite: tests for checking that output mode cannot be called on static DFs
      - Python doc test and existing unit tests modified to call write.outputMode.
      
      Author: Tathagata Das <tathagata.das1565@gmail.com>
      
      Closes #13286 from tdas/complete-mode.
      90b11439
  14. May 18, 2016
    • Liang-Chi Hsieh's avatar
      [SPARK-15342] [SQL] [PYSPARK] PySpark test for non ascii column name does not... · 3d1e67f9
      Liang-Chi Hsieh authored
      [SPARK-15342] [SQL] [PYSPARK] PySpark test for non ascii column name does not actually test with unicode column name
      
      ## What changes were proposed in this pull request?
      
      The PySpark SQL `test_column_name_with_non_ascii` wants to test non-ascii column name. But it doesn't actually test it. We need to construct an unicode explicitly using `unicode` under Python 2.
      
      ## How was this patch tested?
      
      Existing tests.
      
      Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
      
      Closes #13134 from viirya/correct-non-ascii-colname-pytest.
      3d1e67f9
  15. May 17, 2016
    • Sean Zhong's avatar
      [SPARK-15171][SQL] Remove the references to deprecated method dataset.registerTempTable · 25b315e6
      Sean Zhong authored
      ## What changes were proposed in this pull request?
      
      Update the unit test code, examples, and documents to remove calls to deprecated method `dataset.registerTempTable`.
      
      ## How was this patch tested?
      
      This PR only changes the unit test code, examples, and comments. It should be safe.
      This is a follow up of PR https://github.com/apache/spark/pull/12945 which was merged.
      
      Author: Sean Zhong <seanzhong@databricks.com>
      
      Closes #13098 from clockfly/spark-15171-remove-deprecation.
      25b315e6
    • Dongjoon Hyun's avatar
      [SPARK-15244] [PYTHON] Type of column name created with createDataFrame is not consistent. · 0f576a57
      Dongjoon Hyun authored
      ## What changes were proposed in this pull request?
      
      **createDataFrame** returns inconsistent types for column names.
      ```python
      >>> from pyspark.sql.types import StructType, StructField, StringType
      >>> schema = StructType([StructField(u"col", StringType())])
      >>> df1 = spark.createDataFrame([("a",)], schema)
      >>> df1.columns # "col" is str
      ['col']
      >>> df2 = spark.createDataFrame([("a",)], [u"col"])
      >>> df2.columns # "col" is unicode
      [u'col']
      ```
      
      The reason is only **StructField** has the following code.
      ```
      if not isinstance(name, str):
          name = name.encode('utf-8')
      ```
      This PR adds the same logic into **createDataFrame** for consistency.
      ```
      if isinstance(schema, list):
          schema = [x.encode('utf-8') if not isinstance(x, str) else x for x in schema]
      ```
      
      ## How was this patch tested?
      
      Pass the Jenkins test (with new python doctest)
      
      Author: Dongjoon Hyun <dongjoon@apache.org>
      
      Closes #13097 from dongjoon-hyun/SPARK-15244.
      0f576a57
  16. May 11, 2016
  17. May 03, 2016
    • Tathagata Das's avatar
      [SPARK-14716][SQL] Added support for partitioning in FileStreamSink · 4ad492c4
      Tathagata Das authored
      # What changes were proposed in this pull request?
      
      Support partitioning in the file stream sink. This is implemented using a new, but simpler code path for writing parquet files - both unpartitioned and partitioned. This new code path does not use Output Committers, as we will eventually write the file names to the metadata log for "committing" them.
      
      This patch duplicates < 100 LOC from the WriterContainer. But its far simpler that WriterContainer as it does not involve output committing. In addition, it introduces the new APIs in FileFormat and OutputWriterFactory in an attempt to simplify the APIs (not have Job in the `FileFormat` API, not have bucket and other stuff in the `OutputWriterFactory.newInstance()` ).
      
      # Tests
      - New unit tests to test the FileStreamSinkWriter for partitioned and unpartitioned files
      - New unit test to partially test the FileStreamSink for partitioned files (does not test recovery of partition column data, as that requires change in the StreamFileCatalog, future PR).
      - Updated FileStressSuite to test number of records read from partitioned output files.
      
      Author: Tathagata Das <tathagata.das1565@gmail.com>
      
      Closes #12409 from tdas/streaming-partitioned-parquet.
      4ad492c4
  18. Apr 29, 2016
    • Andrew Or's avatar
      [SPARK-15012][SQL] Simplify configuration API further · 66773eb8
      Andrew Or authored
      ## What changes were proposed in this pull request?
      
      1. Remove all the `spark.setConf` etc. Just expose `spark.conf`
      2. Make `spark.conf` take in things set in the core `SparkConf` as well, otherwise users may get confused
      
      This was done for both the Python and Scala APIs.
      
      ## How was this patch tested?
      `SQLConfSuite`, python tests.
      
      This one fixes the failed tests in #12787
      
      Closes #12787
      
      Author: Andrew Or <andrew@databricks.com>
      Author: Yin Huai <yhuai@databricks.com>
      
      Closes #12798 from yhuai/conf-api.
      66773eb8
    • Andrew Or's avatar
      [SPARK-14988][PYTHON] SparkSession API follow-ups · d33e3d57
      Andrew Or authored
      ## What changes were proposed in this pull request?
      
      Addresses comments in #12765.
      
      ## How was this patch tested?
      
      Python tests.
      
      Author: Andrew Or <andrew@databricks.com>
      
      Closes #12784 from andrewor14/python-followup.
      d33e3d57
  19. Apr 28, 2016
    • Burak Yavuz's avatar
      [SPARK-14555] Second cut of Python API for Structured Streaming · 78c8aaf8
      Burak Yavuz authored
      ## What changes were proposed in this pull request?
      
      This PR adds Python APIs for:
       - `ContinuousQueryManager`
       - `ContinuousQueryException`
      
      The `ContinuousQueryException` is a very basic wrapper, it doesn't provide the functionality that the Scala side provides, but it follows the same pattern for `AnalysisException`.
      
      For `ContinuousQueryManager`, all APIs are provided except for registering listeners.
      
      This PR also attempts to fix test flakiness by stopping all active streams just before tests.
      
      ## How was this patch tested?
      
      Python Doc tests and unit tests
      
      Author: Burak Yavuz <brkyvz@gmail.com>
      
      Closes #12673 from brkyvz/pyspark-cqm.
      78c8aaf8
    • Andrew Or's avatar
      [SPARK-14945][PYTHON] SparkSession Python API · 89addd40
      Andrew Or authored
      ## What changes were proposed in this pull request?
      
      ```
      Welcome to
            ____              __
           / __/__  ___ _____/ /__
          _\ \/ _ \/ _ `/ __/  '_/
         /__ / .__/\_,_/_/ /_/\_\   version 2.0.0-SNAPSHOT
            /_/
      
      Using Python version 2.7.5 (default, Mar  9 2014 22:15:05)
      SparkSession available as 'spark'.
      >>> spark
      <pyspark.sql.session.SparkSession object at 0x101f3bfd0>
      >>> spark.sql("SHOW TABLES").show()
      ...
      +---------+-----------+
      |tableName|isTemporary|
      +---------+-----------+
      |      src|      false|
      +---------+-----------+
      
      >>> spark.range(1, 10, 2).show()
      +---+
      | id|
      +---+
      |  1|
      |  3|
      |  5|
      |  7|
      |  9|
      +---+
      ```
      **Note**: This API is NOT complete in its current state. In particular, for now I left out the `conf` and `catalog` APIs, which were added later in Scala. These will be added later before 2.0.
      
      ## How was this patch tested?
      
      Python tests.
      
      Author: Andrew Or <andrew@databricks.com>
      
      Closes #12746 from andrewor14/python-spark-session.
      89addd40
  20. Apr 22, 2016
    • Liang-Chi Hsieh's avatar
      [SPARK-13266] [SQL] None read/writer options were not transalated to "null" · 056883e0
      Liang-Chi Hsieh authored
      ## What changes were proposed in this pull request?
      
      In Python, the `option` and `options` method of `DataFrameReader` and `DataFrameWriter` were sending the string "None" instead of `null` when passed `None`, therefore making it impossible to send an actual `null`. This fixes that problem.
      
      This is based on #11305 from mathieulongtin.
      
      ## How was this patch tested?
      
      Added test to readwriter.py.
      
      Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
      Author: mathieu longtin <mathieu.longtin@nuance.com>
      
      Closes #12494 from viirya/py-df-none-option.
      056883e0
  21. Apr 20, 2016
    • Sheamus K. Parkes's avatar
      [SPARK-13842] [PYSPARK] pyspark.sql.types.StructType accessor enhancements · e7791c4f
      Sheamus K. Parkes authored
      ## What changes were proposed in this pull request?
      
      Expand the possible ways to interact with the contents of a `pyspark.sql.types.StructType` instance.
        - Iterating a `StructType` will iterate its fields
          - `[field.name for field in my_structtype]`
        - Indexing with a string will return a field by name
          - `my_structtype['my_field_name']`
        - Indexing with an integer will return a field by position
          - `my_structtype[0]`
        - Indexing with a slice will return a new `StructType` with just the chosen fields:
          - `my_structtype[1:3]`
        - The length is the number of fields (should also provide "truthiness" for free)
          - `len(my_structtype) == 2`
      
      ## How was this patch tested?
      
      Extended the unit test coverage in the accompanying `tests.py`.
      
      Author: Sheamus K. Parkes <shea.parkes@milliman.com>
      
      Closes #12251 from skparkes/pyspark-structtype-enhance.
      e7791c4f
    • Burak Yavuz's avatar
      [SPARK-14555] First cut of Python API for Structured Streaming · 80bf48f4
      Burak Yavuz authored
      ## What changes were proposed in this pull request?
      
      This patch provides a first cut of python APIs for structured streaming. This PR provides the new classes:
       - ContinuousQuery
       - Trigger
       - ProcessingTime
      in pyspark under `pyspark.sql.streaming`.
      
      In addition, it contains the new methods added under:
       -  `DataFrameWriter`
           a) `startStream`
           b) `trigger`
           c) `queryName`
      
       -  `DataFrameReader`
           a) `stream`
      
       - `DataFrame`
          a) `isStreaming`
      
      This PR doesn't contain all methods exposed for `ContinuousQuery`, for example:
       - `exception`
       - `sourceStatuses`
       - `sinkStatus`
      
      They may be added in a follow up.
      
      This PR also contains some very minor doc fixes in the Scala side.
      
      ## How was this patch tested?
      
      Python doc tests
      
      TODO:
       - [ ] verify Python docs look good
      
      Author: Burak Yavuz <brkyvz@gmail.com>
      Author: Burak Yavuz <burak@databricks.com>
      
      Closes #12320 from brkyvz/stream-python.
      80bf48f4
  22. Apr 19, 2016
  23. Apr 04, 2016
    • Davies Liu's avatar
      [SPARK-12981] [SQL] extract Pyhton UDF in physical plan · 5743c647
      Davies Liu authored
      ## What changes were proposed in this pull request?
      
      Currently we extract Python UDFs into a special logical plan EvaluatePython in analyzer, But EvaluatePython is not part of catalyst, many rules have no knowledge of it , which will break many things (for example, filter push down or column pruning).
      
      We should treat Python UDFs as normal expressions, until we want to evaluate in physical plan, we could extract them in end of optimizer, or physical plan.
      
      This PR extract Python UDFs in physical plan.
      
      Closes #10935
      
      ## How was this patch tested?
      
      Added regression tests.
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #12127 from davies/py_udf.
      5743c647
  24. Mar 31, 2016
    • Davies Liu's avatar
      [SPARK-14267] [SQL] [PYSPARK] execute multiple Python UDFs within single batch · f0afafdc
      Davies Liu authored
      ## What changes were proposed in this pull request?
      
      This PR support multiple Python UDFs within single batch, also improve the performance.
      
      ```python
      >>> from pyspark.sql.types import IntegerType
      >>> sqlContext.registerFunction("double", lambda x: x * 2, IntegerType())
      >>> sqlContext.registerFunction("add", lambda x, y: x + y, IntegerType())
      >>> sqlContext.sql("SELECT double(add(1, 2)), add(double(2), 1)").explain(True)
      == Parsed Logical Plan ==
      'Project [unresolvedalias('double('add(1, 2)), None),unresolvedalias('add('double(2), 1), None)]
      +- OneRowRelation$
      
      == Analyzed Logical Plan ==
      double(add(1, 2)): int, add(double(2), 1): int
      Project [double(add(1, 2))#14,add(double(2), 1)#15]
      +- Project [double(add(1, 2))#14,add(double(2), 1)#15]
         +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
            +- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
               +- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
                  +- OneRowRelation$
      
      == Optimized Logical Plan ==
      Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
      +- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
         +- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
            +- OneRowRelation$
      
      == Physical Plan ==
      WholeStageCodegen
      :  +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
      :     +- INPUT
      +- !BatchPythonEvaluation [add(pythonUDF1#17, 1)], [pythonUDF0#16,pythonUDF1#17,pythonUDF0#18]
         +- !BatchPythonEvaluation [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
            +- Scan OneRowRelation[]
      ```
      
      ## How was this patch tested?
      
      Added new tests.
      
      Using the following script to benchmark 1, 2 and 3 udfs,
      ```
      df = sqlContext.range(1, 1 << 23, 1, 4)
      double = F.udf(lambda x: x * 2, LongType())
      print df.select(double(df.id)).count()
      print df.select(double(df.id), double(df.id + 1)).count()
      print df.select(double(df.id), double(df.id + 1), double(df.id + 2)).count()
      ```
      Here is the results:
      
      N | Before | After  | speed up
      ---- |------------ | -------------|------
      1 | 22 s | 7 s |  3.1X
      2 | 38 s | 13 s | 2.9X
      3 | 58 s | 16 s | 3.6X
      
      This benchmark ran locally with 4 CPUs. For 3 UDFs, it launched 12 Python before before this patch, 4 process after this patch. After this patch, it will use less memory for multiple UDFs than before (less buffering).
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #12057 from davies/multi_udfs.
      f0afafdc
  25. Mar 29, 2016
    • Davies Liu's avatar
      [SPARK-14215] [SQL] [PYSPARK] Support chained Python UDFs · a7a93a11
      Davies Liu authored
      ## What changes were proposed in this pull request?
      
      This PR brings the support for chained Python UDFs, for example
      
      ```sql
      select udf1(udf2(a))
      select udf1(udf2(a) + 3)
      select udf1(udf2(a) + udf3(b))
      ```
      
      Also directly chained unary Python UDFs are put in single batch of Python UDFs, others may require multiple batches.
      
      For example,
      ```python
      >>> sqlContext.sql("select double(double(1))").explain()
      == Physical Plan ==
      WholeStageCodegen
      :  +- Project [pythonUDF#10 AS double(double(1))#9]
      :     +- INPUT
      +- !BatchPythonEvaluation double(double(1)), [pythonUDF#10]
         +- Scan OneRowRelation[]
      >>> sqlContext.sql("select double(double(1) + double(2))").explain()
      == Physical Plan ==
      WholeStageCodegen
      :  +- Project [pythonUDF#19 AS double((double(1) + double(2)))#16]
      :     +- INPUT
      +- !BatchPythonEvaluation double((pythonUDF#17 + pythonUDF#18)), [pythonUDF#17,pythonUDF#18,pythonUDF#19]
         +- !BatchPythonEvaluation double(2), [pythonUDF#17,pythonUDF#18]
            +- !BatchPythonEvaluation double(1), [pythonUDF#17]
               +- Scan OneRowRelation[]
      ```
      
      TODO: will support multiple unrelated Python UDFs in one batch (another PR).
      
      ## How was this patch tested?
      
      Added new unit tests for chained UDFs.
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #12014 from davies/py_udfs.
      a7a93a11
  26. Mar 28, 2016
    • Herman van Hovell's avatar
      [SPARK-13713][SQL] Migrate parser from ANTLR3 to ANTLR4 · 600c0b69
      Herman van Hovell authored
      ### What changes were proposed in this pull request?
      The current ANTLR3 parser is quite complex to maintain and suffers from code blow-ups. This PR introduces a new parser that is based on ANTLR4.
      
      This parser is based on the [Presto's SQL parser](https://github.com/facebook/presto/blob/master/presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4). The current implementation can parse and create Catalyst and SQL plans. Large parts of the HiveQl DDL and some of the DML functionality is currently missing, the plan is to add this in follow-up PRs.
      
      This PR is a work in progress, and work needs to be done in the following area's:
      
      - [x] Error handling should be improved.
      - [x] Documentation should be improved.
      - [x] Multi-Insert needs to be tested.
      - [ ] Naming and package locations.
      
      ### How was this patch tested?
      
      Catalyst and SQL unit tests.
      
      Author: Herman van Hovell <hvanhovell@questtec.nl>
      
      Closes #11557 from hvanhovell/ngParser.
      600c0b69
  27. Mar 25, 2016
  28. Mar 08, 2016
    • Wenchen Fan's avatar
      [SPARK-13593] [SQL] improve the `createDataFrame` to accept data type string and verify the data · d57daf1f
      Wenchen Fan authored
      ## What changes were proposed in this pull request?
      
      This PR improves the `createDataFrame` method to make it also accept datatype string, then users can convert python RDD to DataFrame easily, for example, `df = rdd.toDF("a: int, b: string")`.
      It also supports flat schema so users can convert an RDD of int to DataFrame directly, we will automatically wrap int to row for users.
      If schema is given, now we checks if the real data matches the given schema, and throw error if it doesn't.
      
      ## How was this patch tested?
      
      new tests in `test.py` and doc test in `types.py`
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #11444 from cloud-fan/pyrdd.
      d57daf1f
  29. Mar 02, 2016
  30. Feb 25, 2016
    • Joseph K. Bradley's avatar
      [SPARK-13479][SQL][PYTHON] Added Python API for approxQuantile · 13ce10e9
      Joseph K. Bradley authored
      ## What changes were proposed in this pull request?
      
      * Scala DataFrameStatFunctions: Added version of approxQuantile taking a List instead of an Array, for Python compatbility
      * Python DataFrame and DataFrameStatFunctions: Added approxQuantile
      
      ## How was this patch tested?
      
      * unit test in sql/tests.py
      
      Documentation was copied from the existing approxQuantile exactly.
      
      Author: Joseph K. Bradley <joseph@databricks.com>
      
      Closes #11356 from jkbradley/approx-quantile-python.
      13ce10e9
  31. Feb 21, 2016
    • Franklyn D'souza's avatar
      [SPARK-13410][SQL] Support unionAll for DataFrames with UDT columns. · 0f90f4e6
      Franklyn D'souza authored
      ## What changes were proposed in this pull request?
      
      This PR adds equality operators to UDT classes so that they can be correctly tested for dataType equality during union operations.
      
      This was previously causing `"AnalysisException: u"unresolved operator 'Union;""` when trying to unionAll two dataframes with UDT columns as below.
      
      ```
      from pyspark.sql.tests import PythonOnlyPoint, PythonOnlyUDT
      from pyspark.sql import types
      
      schema = types.StructType([types.StructField("point", PythonOnlyUDT(), True)])
      
      a = sqlCtx.createDataFrame([[PythonOnlyPoint(1.0, 2.0)]], schema)
      b = sqlCtx.createDataFrame([[PythonOnlyPoint(3.0, 4.0)]], schema)
      
      c = a.unionAll(b)
      ```
      
      ## How was the this patch tested?
      
      Tested using two unit tests in sql/test.py and the DataFrameSuite.
      
      Additional information here : https://issues.apache.org/jira/browse/SPARK-13410
      
      Author: Franklyn D'souza <franklynd@gmail.com>
      
      Closes #11279 from damnMeddlingKid/udt-union-all.
      0f90f4e6
  32. Jan 31, 2016
    • Herman van Hovell's avatar
      [SPARK-13049] Add First/last with ignore nulls to functions.scala · 5a8b978f
      Herman van Hovell authored
      This PR adds the ability to specify the ```ignoreNulls``` option to the functions dsl, e.g:
      ```df.select($"id", last($"value", ignoreNulls = true).over(Window.partitionBy($"id").orderBy($"other"))```
      
      This PR is some where between a bug fix (see the JIRA) and a new feature. I am not sure if we should backport to 1.6.
      
      cc yhuai
      
      Author: Herman van Hovell <hvanhovell@questtec.nl>
      
      Closes #10957 from hvanhovell/SPARK-13049.
      5a8b978f
  33. Jan 27, 2016
    • Jason Lee's avatar
      [SPARK-10847][SQL][PYSPARK] Pyspark - DataFrame - Optional Metadata with... · edd47375
      Jason Lee authored
      [SPARK-10847][SQL][PYSPARK] Pyspark - DataFrame - Optional Metadata with `None` triggers cryptic failure
      
      The error message is now changed from "Do not support type class scala.Tuple2." to "Do not support type class org.json4s.JsonAST$JNull$" to be more informative about what is not supported. Also, StructType metadata now handles JNull correctly, i.e., {'a': None}. test_metadata_null is added to tests.py to show the fix works.
      
      Author: Jason Lee <cjlee@us.ibm.com>
      
      Closes #8969 from jasoncl/SPARK-10847.
      edd47375
Loading