From ee7f365bd0f8822b213a3f434bc958d9eba8db3c Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@databricks.com> Date: Wed, 3 Jun 2015 00:23:34 -0700 Subject: [PATCH] [SPARK-8060] Improve DataFrame Python test coverage and documentation. Author: Reynold Xin <rxin@databricks.com> Closes #6601 from rxin/python-read-write-test-and-doc and squashes the following commits: baa8ad5 [Reynold Xin] Code review feedback. f081d47 [Reynold Xin] More documentation updates. c9902fa [Reynold Xin] [SPARK-8060] Improve DataFrame Python reader/writer interface doc and testing. (cherry picked from commit ce320cb2dbf28825f80795ce569735888f98d6e8) Signed-off-by: Reynold Xin <rxin@databricks.com> --- .rat-excludes | 1 + python/pyspark/sql/__init__.py | 13 +- python/pyspark/sql/context.py | 89 +++---- python/pyspark/sql/dataframe.py | 82 +++---- python/pyspark/sql/readwriter.py | 217 ++++++++---------- python/pyspark/sql/tests.py | 2 + .../sql/parquet_partitioned/_SUCCESS | 0 .../sql/parquet_partitioned/_common_metadata | Bin 0 -> 210 bytes .../sql/parquet_partitioned/_metadata | Bin 0 -> 743 bytes .../day=1/.part-r-00008.gz.parquet.crc | Bin 0 -> 12 bytes .../month=9/day=1/part-r-00008.gz.parquet | Bin 0 -> 322 bytes .../day=25/.part-r-00002.gz.parquet.crc | Bin 0 -> 12 bytes .../day=25/.part-r-00004.gz.parquet.crc | Bin 0 -> 12 bytes .../month=10/day=25/part-r-00002.gz.parquet | Bin 0 -> 343 bytes .../month=10/day=25/part-r-00004.gz.parquet | Bin 0 -> 343 bytes .../day=26/.part-r-00005.gz.parquet.crc | Bin 0 -> 12 bytes .../month=10/day=26/part-r-00005.gz.parquet | Bin 0 -> 333 bytes .../day=1/.part-r-00007.gz.parquet.crc | Bin 0 -> 12 bytes .../month=9/day=1/part-r-00007.gz.parquet | Bin 0 -> 343 bytes python/test_support/sql/people.json | 3 + 20 files changed, 180 insertions(+), 227 deletions(-) create mode 100644 python/test_support/sql/parquet_partitioned/_SUCCESS create mode 100644 python/test_support/sql/parquet_partitioned/_common_metadata create mode 100644 python/test_support/sql/parquet_partitioned/_metadata create mode 100644 python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/.part-r-00008.gz.parquet.crc create mode 100644 python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/part-r-00008.gz.parquet create mode 100644 python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00002.gz.parquet.crc create mode 100644 python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00004.gz.parquet.crc create mode 100644 python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00002.gz.parquet create mode 100644 python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00004.gz.parquet create mode 100644 python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/.part-r-00005.gz.parquet.crc create mode 100644 python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/part-r-00005.gz.parquet create mode 100644 python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/.part-r-00007.gz.parquet.crc create mode 100644 python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/part-r-00007.gz.parquet create mode 100644 python/test_support/sql/people.json diff --git a/.rat-excludes b/.rat-excludes index c0f81b57fe..8f2722cbd0 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -82,3 +82,4 @@ local-1426633911242/* local-1430917381534/* DESCRIPTION NAMESPACE +test_support/* diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index 726d288d97..ad9c891ba1 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -45,11 +45,20 @@ from __future__ import absolute_import def since(version): + """ + A decorator that annotates a function to append the version of Spark the function was added. + """ + import re + indent_p = re.compile(r'\n( +)') + def deco(f): - f.__doc__ = f.__doc__.rstrip() + "\n\n.. versionadded:: %s" % version + indents = indent_p.findall(f.__doc__) + indent = ' ' * (min(len(m) for m in indents) if indents else 0) + f.__doc__ = f.__doc__.rstrip() + "\n\n%s.. versionadded:: %s" % (indent, version) return f return deco + from pyspark.sql.types import Row from pyspark.sql.context import SQLContext, HiveContext from pyspark.sql.column import Column @@ -58,7 +67,9 @@ from pyspark.sql.group import GroupedData from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter from pyspark.sql.window import Window, WindowSpec + __all__ = [ 'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row', 'DataFrameNaFunctions', 'DataFrameStatFunctions', 'Window', 'WindowSpec', + 'DataFrameReader', 'DataFrameWriter' ] diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 22f6257dfe..9fdf43c3e6 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -124,7 +124,10 @@ class SQLContext(object): @property @since("1.3.1") def udf(self): - """Returns a :class:`UDFRegistration` for UDF registration.""" + """Returns a :class:`UDFRegistration` for UDF registration. + + :return: :class:`UDFRegistration` + """ return UDFRegistration(self) @since(1.4) @@ -138,7 +141,7 @@ class SQLContext(object): :param end: the end value (exclusive) :param step: the incremental step (default: 1) :param numPartitions: the number of partitions of the DataFrame - :return: A new DataFrame + :return: :class:`DataFrame` >>> sqlContext.range(1, 7, 2).collect() [Row(id=1), Row(id=3), Row(id=5)] @@ -195,8 +198,8 @@ class SQLContext(object): raise ValueError("The first row in RDD is empty, " "can not infer schema") if type(first) is dict: - warnings.warn("Using RDD of dict to inferSchema is deprecated," - "please use pyspark.sql.Row instead") + warnings.warn("Using RDD of dict to inferSchema is deprecated. " + "Use pyspark.sql.Row instead") if samplingRatio is None: schema = _infer_schema(first) @@ -219,7 +222,7 @@ class SQLContext(object): """ .. note:: Deprecated in 1.3, use :func:`createDataFrame` instead. """ - warnings.warn("inferSchema is deprecated, please use createDataFrame instead") + warnings.warn("inferSchema is deprecated, please use createDataFrame instead.") if isinstance(rdd, DataFrame): raise TypeError("Cannot apply schema to DataFrame") @@ -262,6 +265,7 @@ class SQLContext(object): :class:`list`, or :class:`pandas.DataFrame`. :param schema: a :class:`StructType` or list of column names. default None. :param samplingRatio: the sample ratio of rows used for inferring + :return: :class:`DataFrame` >>> l = [('Alice', 1)] >>> sqlContext.createDataFrame(l).collect() @@ -359,18 +363,15 @@ class SQLContext(object): else: raise ValueError("Can only register DataFrame as table") - @since(1.0) def parquetFile(self, *paths): """Loads a Parquet file, returning the result as a :class:`DataFrame`. - >>> import tempfile, shutil - >>> parquetFile = tempfile.mkdtemp() - >>> shutil.rmtree(parquetFile) - >>> df.saveAsParquetFile(parquetFile) - >>> df2 = sqlContext.parquetFile(parquetFile) - >>> sorted(df.collect()) == sorted(df2.collect()) - True + .. note:: Deprecated in 1.4, use :func:`DataFrameReader.parquet` instead. + + >>> sqlContext.parquetFile('python/test_support/sql/parquet_partitioned').dtypes + [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] """ + warnings.warn("parquetFile is deprecated. Use read.parquet() instead.") gateway = self._sc._gateway jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths)) for i in range(0, len(paths)): @@ -378,39 +379,15 @@ class SQLContext(object): jdf = self._ssql_ctx.parquetFile(jpaths) return DataFrame(jdf, self) - @since(1.0) def jsonFile(self, path, schema=None, samplingRatio=1.0): """Loads a text file storing one JSON object per line as a :class:`DataFrame`. - If the schema is provided, applies the given schema to this JSON dataset. - Otherwise, it samples the dataset with ratio ``samplingRatio`` to determine the schema. - - >>> import tempfile, shutil - >>> jsonFile = tempfile.mkdtemp() - >>> shutil.rmtree(jsonFile) - >>> with open(jsonFile, 'w') as f: - ... f.writelines(jsonStrings) - >>> df1 = sqlContext.jsonFile(jsonFile) - >>> df1.printSchema() - root - |-- field1: long (nullable = true) - |-- field2: string (nullable = true) - |-- field3: struct (nullable = true) - | |-- field4: long (nullable = true) + .. note:: Deprecated in 1.4, use :func:`DataFrameReader.json` instead. - >>> from pyspark.sql.types import * - >>> schema = StructType([ - ... StructField("field2", StringType()), - ... StructField("field3", - ... StructType([StructField("field5", ArrayType(IntegerType()))]))]) - >>> df2 = sqlContext.jsonFile(jsonFile, schema) - >>> df2.printSchema() - root - |-- field2: string (nullable = true) - |-- field3: struct (nullable = true) - | |-- field5: array (nullable = true) - | | |-- element: integer (containsNull = true) + >>> sqlContext.jsonFile('python/test_support/sql/people.json').dtypes + [('age', 'bigint'), ('name', 'string')] """ + warnings.warn("jsonFile is deprecated. Use read.json() instead.") if schema is None: df = self._ssql_ctx.jsonFile(path, samplingRatio) else: @@ -462,21 +439,16 @@ class SQLContext(object): df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype) return DataFrame(df, self) - @since(1.3) def load(self, path=None, source=None, schema=None, **options): """Returns the dataset in a data source as a :class:`DataFrame`. - The data source is specified by the ``source`` and a set of ``options``. - If ``source`` is not specified, the default data source configured by - ``spark.sql.sources.default`` will be used. - - Optionally, a schema can be provided as the schema of the returned DataFrame. + .. note:: Deprecated in 1.4, use :func:`DataFrameReader.load` instead. """ + warnings.warn("load is deprecated. Use read.load() instead.") return self.read.load(path, source, schema, **options) @since(1.3) - def createExternalTable(self, tableName, path=None, source=None, - schema=None, **options): + def createExternalTable(self, tableName, path=None, source=None, schema=None, **options): """Creates an external table based on the dataset in a data source. It returns the DataFrame associated with the external table. @@ -487,6 +459,8 @@ class SQLContext(object): Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and created external table. + + :return: :class:`DataFrame` """ if path is not None: options["path"] = path @@ -508,6 +482,8 @@ class SQLContext(object): def sql(self, sqlQuery): """Returns a :class:`DataFrame` representing the result of the given query. + :return: :class:`DataFrame` + >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> df2 = sqlContext.sql("SELECT field1 AS f1, field2 as f2 from table1") >>> df2.collect() @@ -519,6 +495,8 @@ class SQLContext(object): def table(self, tableName): """Returns the specified table as a :class:`DataFrame`. + :return: :class:`DataFrame` + >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> df2 = sqlContext.table("table1") >>> sorted(df.collect()) == sorted(df2.collect()) @@ -536,6 +514,9 @@ class SQLContext(object): The returned DataFrame has two columns: ``tableName`` and ``isTemporary`` (a column with :class:`BooleanType` indicating if a table is a temporary one or not). + :param dbName: string, name of the database to use. + :return: :class:`DataFrame` + >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> df2 = sqlContext.tables() >>> df2.filter("tableName = 'table1'").first() @@ -550,7 +531,8 @@ class SQLContext(object): def tableNames(self, dbName=None): """Returns a list of names of tables in the database ``dbName``. - If ``dbName`` is not specified, the current database will be used. + :param dbName: string, name of the database to use. Default to the current database. + :return: list of table names, in string >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> "table1" in sqlContext.tableNames() @@ -585,8 +567,7 @@ class SQLContext(object): Returns a :class:`DataFrameReader` that can be used to read data in as a :class:`DataFrame`. - >>> sqlContext.read - <pyspark.sql.readwriter.DataFrameReader object at ...> + :return: :class:`DataFrameReader` """ return DataFrameReader(self) @@ -644,10 +625,14 @@ class UDFRegistration(object): def _test(): + import os import doctest from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext import pyspark.sql.context + + os.chdir(os.environ["SPARK_HOME"]) + globs = pyspark.sql.context.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a82b6b87c4..7673153abe 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -44,7 +44,7 @@ class DataFrame(object): A :class:`DataFrame` is equivalent to a relational table in Spark SQL, and can be created using various functions in :class:`SQLContext`:: - people = sqlContext.parquetFile("...") + people = sqlContext.read.parquet("...") Once created, it can be manipulated using the various domain-specific-language (DSL) functions defined in: :class:`DataFrame`, :class:`Column`. @@ -56,8 +56,8 @@ class DataFrame(object): A more concrete example:: # To create DataFrame using SQLContext - people = sqlContext.parquetFile("...") - department = sqlContext.parquetFile("...") + people = sqlContext.read.parquet("...") + department = sqlContext.read.parquet("...") people.filter(people.age > 30).join(department, people.deptId == department.id)) \ .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"}) @@ -120,21 +120,12 @@ class DataFrame(object): rdd = self._jdf.toJSON() return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode)) - @since(1.3) def saveAsParquetFile(self, path): """Saves the contents as a Parquet file, preserving the schema. - Files that are written out using this method can be read back in as - a :class:`DataFrame` using :func:`SQLContext.parquetFile`. - - >>> import tempfile, shutil - >>> parquetFile = tempfile.mkdtemp() - >>> shutil.rmtree(parquetFile) - >>> df.saveAsParquetFile(parquetFile) - >>> df2 = sqlContext.parquetFile(parquetFile) - >>> sorted(df2.collect()) == sorted(df.collect()) - True + .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.parquet` instead. """ + warnings.warn("saveAsParquetFile is deprecated. Use write.parquet() instead.") self._jdf.saveAsParquetFile(path) @since(1.3) @@ -151,69 +142,45 @@ class DataFrame(object): """ self._jdf.registerTempTable(name) - @since(1.3) def registerAsTable(self, name): - """DEPRECATED: use :func:`registerTempTable` instead""" - warnings.warn("Use registerTempTable instead of registerAsTable.", DeprecationWarning) + """ + .. note:: Deprecated in 1.4, use :func:`registerTempTable` instead. + """ + warnings.warn("Use registerTempTable instead of registerAsTable.") self.registerTempTable(name) - @since(1.3) def insertInto(self, tableName, overwrite=False): """Inserts the contents of this :class:`DataFrame` into the specified table. - Optionally overwriting any existing data. + .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.insertInto` instead. """ + warnings.warn("insertInto is deprecated. Use write.insertInto() instead.") self.write.insertInto(tableName, overwrite) - @since(1.3) def saveAsTable(self, tableName, source=None, mode="error", **options): """Saves the contents of this :class:`DataFrame` to a data source as a table. - The data source is specified by the ``source`` and a set of ``options``. - If ``source`` is not specified, the default data source configured by - ``spark.sql.sources.default`` will be used. - - Additionally, mode is used to specify the behavior of the saveAsTable operation when - table already exists in the data source. There are four modes: - - * `append`: Append contents of this :class:`DataFrame` to existing data. - * `overwrite`: Overwrite existing data. - * `error`: Throw an exception if data already exists. - * `ignore`: Silently ignore this operation if data already exists. + .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.saveAsTable` instead. """ + warnings.warn("insertInto is deprecated. Use write.saveAsTable() instead.") self.write.saveAsTable(tableName, source, mode, **options) @since(1.3) def save(self, path=None, source=None, mode="error", **options): """Saves the contents of the :class:`DataFrame` to a data source. - The data source is specified by the ``source`` and a set of ``options``. - If ``source`` is not specified, the default data source configured by - ``spark.sql.sources.default`` will be used. - - Additionally, mode is used to specify the behavior of the save operation when - data already exists in the data source. There are four modes: - - * `append`: Append contents of this :class:`DataFrame` to existing data. - * `overwrite`: Overwrite existing data. - * `error`: Throw an exception if data already exists. - * `ignore`: Silently ignore this operation if data already exists. + .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.save` instead. """ + warnings.warn("insertInto is deprecated. Use write.save() instead.") return self.write.save(path, source, mode, **options) @property @since(1.4) def write(self): """ - Interface for saving the content of the :class:`DataFrame` out - into external storage. - - :return :class:`DataFrameWriter` + Interface for saving the content of the :class:`DataFrame` out into external storage. - .. note:: Experimental - - >>> df.write - <pyspark.sql.readwriter.DataFrameWriter object at ...> + :return: :class:`DataFrameWriter` """ return DataFrameWriter(self) @@ -636,6 +603,9 @@ class DataFrame(object): This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical columns. + .. note:: This function is meant for exploratory data analysis, as we make no \ + guarantee about the backward compatibility of the schema of the resulting DataFrame. + >>> df.describe().show() +-------+---+ |summary|age| @@ -653,9 +623,11 @@ class DataFrame(object): @ignore_unicode_prefix @since(1.3) def head(self, n=None): - """ - Returns the first ``n`` rows as a list of :class:`Row`, - or the first :class:`Row` if ``n`` is ``None.`` + """Returns the first ``n`` rows. + + :param n: int, default 1. Number of rows to return. + :return: If n is greater than 1, return a list of :class:`Row`. + If n is 1, return a single Row. >>> df.head() Row(age=2, name=u'Alice') @@ -1170,8 +1142,8 @@ class DataFrame(object): "http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou". :func:`DataFrame.freqItems` and :func:`DataFrameStatFunctions.freqItems` are aliases. - This function is meant for exploratory data analysis, as we make no guarantee about the - backward compatibility of the schema of the resulting DataFrame. + .. note:: This function is meant for exploratory data analysis, as we make no \ + guarantee about the backward compatibility of the schema of the resulting DataFrame. :param cols: Names of the columns to calculate frequent items for as a list or tuple of strings. diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index d17d87419f..f036644acc 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -45,18 +45,24 @@ class DataFrameReader(object): @since(1.4) def format(self, source): - """ - Specifies the input data source format. + """Specifies the input data source format. + + :param source: string, name of the data source, e.g. 'json', 'parquet'. + + >>> df = sqlContext.read.format('json').load('python/test_support/sql/people.json') + >>> df.dtypes + [('age', 'bigint'), ('name', 'string')] + """ self._jreader = self._jreader.format(source) return self @since(1.4) def schema(self, schema): - """ - Specifies the input schema. Some data sources (e.g. JSON) can - infer the input schema automatically from data. By specifying - the schema here, the underlying data source can skip the schema + """Specifies the input schema. + + Some data sources (e.g. JSON) can infer the input schema automatically from data. + By specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. :param schema: a StructType object @@ -69,8 +75,7 @@ class DataFrameReader(object): @since(1.4) def options(self, **options): - """ - Adds input options for the underlying data source. + """Adds input options for the underlying data source. """ for k in options: self._jreader = self._jreader.option(k, options[k]) @@ -84,6 +89,10 @@ class DataFrameReader(object): :param format: optional string for format of the data source. Default to 'parquet'. :param schema: optional :class:`StructType` for the input schema. :param options: all other string options + + >>> df = sqlContext.read.load('python/test_support/sql/parquet_partitioned') + >>> df.dtypes + [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] """ if format is not None: self.format(format) @@ -107,31 +116,10 @@ class DataFrameReader(object): :param path: string, path to the JSON dataset. :param schema: an optional :class:`StructType` for the input schema. - >>> import tempfile, shutil - >>> jsonFile = tempfile.mkdtemp() - >>> shutil.rmtree(jsonFile) - >>> with open(jsonFile, 'w') as f: - ... f.writelines(jsonStrings) - >>> df1 = sqlContext.read.json(jsonFile) - >>> df1.printSchema() - root - |-- field1: long (nullable = true) - |-- field2: string (nullable = true) - |-- field3: struct (nullable = true) - | |-- field4: long (nullable = true) - - >>> from pyspark.sql.types import * - >>> schema = StructType([ - ... StructField("field2", StringType()), - ... StructField("field3", - ... StructType([StructField("field5", ArrayType(IntegerType()))]))]) - >>> df2 = sqlContext.read.json(jsonFile, schema) - >>> df2.printSchema() - root - |-- field2: string (nullable = true) - |-- field3: struct (nullable = true) - | |-- field5: array (nullable = true) - | | |-- element: integer (containsNull = true) + >>> df = sqlContext.read.json('python/test_support/sql/people.json') + >>> df.dtypes + [('age', 'bigint'), ('name', 'string')] + """ if schema is not None: self.schema(schema) @@ -141,10 +129,12 @@ class DataFrameReader(object): def table(self, tableName): """Returns the specified table as a :class:`DataFrame`. - >>> sqlContext.registerDataFrameAsTable(df, "table1") - >>> df2 = sqlContext.read.table("table1") - >>> sorted(df.collect()) == sorted(df2.collect()) - True + :param tableName: string, name of the table. + + >>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned') + >>> df.registerTempTable('tmpTable') + >>> sqlContext.read.table('tmpTable').dtypes + [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] """ return self._df(self._jreader.table(tableName)) @@ -152,13 +142,9 @@ class DataFrameReader(object): def parquet(self, *path): """Loads a Parquet file, returning the result as a :class:`DataFrame`. - >>> import tempfile, shutil - >>> parquetFile = tempfile.mkdtemp() - >>> shutil.rmtree(parquetFile) - >>> df.saveAsParquetFile(parquetFile) - >>> df2 = sqlContext.read.parquet(parquetFile) - >>> sorted(df.collect()) == sorted(df2.collect()) - True + >>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned') + >>> df.dtypes + [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] """ return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path))) @@ -221,30 +207,34 @@ class DataFrameWriter(object): @since(1.4) def mode(self, saveMode): - """ - Specifies the behavior when data or table already exists. Options include: + """Specifies the behavior when data or table already exists. + + Options include: * `append`: Append contents of this :class:`DataFrame` to existing data. * `overwrite`: Overwrite existing data. * `error`: Throw an exception if data already exists. * `ignore`: Silently ignore this operation if data already exists. + + >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ self._jwrite = self._jwrite.mode(saveMode) return self @since(1.4) def format(self, source): - """ - Specifies the underlying output data source. Built-in options include - "parquet", "json", etc. + """Specifies the underlying output data source. + + :param source: string, name of the data source, e.g. 'json', 'parquet'. + + >>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data')) """ self._jwrite = self._jwrite.format(source) return self @since(1.4) def options(self, **options): - """ - Adds output options for the underlying data source. + """Adds output options for the underlying data source. """ for k in options: self._jwrite = self._jwrite.option(k, options[k]) @@ -252,12 +242,14 @@ class DataFrameWriter(object): @since(1.4) def partitionBy(self, *cols): - """ - Partitions the output by the given columns on the file system. + """Partitions the output by the given columns on the file system. + If specified, the output is laid out on the file system similar to Hive's partitioning scheme. :param cols: name of columns + + >>> df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ if len(cols) == 1 and isinstance(cols[0], (list, tuple)): cols = cols[0] @@ -266,25 +258,23 @@ class DataFrameWriter(object): @since(1.4) def save(self, path=None, format=None, mode="error", **options): - """ - Saves the contents of the :class:`DataFrame` to a data source. + """Saves the contents of the :class:`DataFrame` to a data source. The data source is specified by the ``format`` and a set of ``options``. If ``format`` is not specified, the default data source configured by ``spark.sql.sources.default`` will be used. - Additionally, mode is used to specify the behavior of the save operation when - data already exists in the data source. There are four modes: - - * `append`: Append contents of this :class:`DataFrame` to existing data. - * `overwrite`: Overwrite existing data. - * `error`: Throw an exception if data already exists. - * `ignore`: Silently ignore this operation if data already exists. - :param path: the path in a Hadoop supported file system :param format: the format used to save - :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error) + :param mode: specifies the behavior of the save operation when data already exists. + + * ``append``: Append contents of this :class:`DataFrame` to existing data. + * ``overwrite``: Overwrite existing data. + * ``ignore``: Silently ignore this operation if data already exists. + * ``error`` (default case): Throw an exception if data already exists. :param options: all other string options + + >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode).options(**options) if format is not None: @@ -296,8 +286,8 @@ class DataFrameWriter(object): @since(1.4) def insertInto(self, tableName, overwrite=False): - """ - Inserts the content of the :class:`DataFrame` to the specified table. + """Inserts the content of the :class:`DataFrame` to the specified table. + It requires that the schema of the class:`DataFrame` is the same as the schema of the table. @@ -307,8 +297,7 @@ class DataFrameWriter(object): @since(1.4) def saveAsTable(self, name, format=None, mode="error", **options): - """ - Saves the content of the :class:`DataFrame` as the specified table. + """Saves the content of the :class:`DataFrame` as the specified table. In the case the table already exists, behavior of this function depends on the save mode, specified by the `mode` function (default to throwing an exception). @@ -328,67 +317,58 @@ class DataFrameWriter(object): self.mode(mode).options(**options) if format is not None: self.format(format) - return self._jwrite.saveAsTable(name) + self._jwrite.saveAsTable(name) @since(1.4) def json(self, path, mode="error"): - """ - Saves the content of the :class:`DataFrame` in JSON format at the - specified path. + """Saves the content of the :class:`DataFrame` in JSON format at the specified path. - Additionally, mode is used to specify the behavior of the save operation when - data already exists in the data source. There are four modes: + :param path: the path in any Hadoop supported file system + :param mode: specifies the behavior of the save operation when data already exists. - * `append`: Append contents of this :class:`DataFrame` to existing data. - * `overwrite`: Overwrite existing data. - * `error`: Throw an exception if data already exists. - * `ignore`: Silently ignore this operation if data already exists. + * ``append``: Append contents of this :class:`DataFrame` to existing data. + * ``overwrite``: Overwrite existing data. + * ``ignore``: Silently ignore this operation if data already exists. + * ``error`` (default case): Throw an exception if data already exists. - :param path: the path in any Hadoop supported file system - :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error) + >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ - return self._jwrite.mode(mode).json(path) + self._jwrite.mode(mode).json(path) @since(1.4) def parquet(self, path, mode="error"): - """ - Saves the content of the :class:`DataFrame` in Parquet format at the - specified path. + """Saves the content of the :class:`DataFrame` in Parquet format at the specified path. - Additionally, mode is used to specify the behavior of the save operation when - data already exists in the data source. There are four modes: + :param path: the path in any Hadoop supported file system + :param mode: specifies the behavior of the save operation when data already exists. - * `append`: Append contents of this :class:`DataFrame` to existing data. - * `overwrite`: Overwrite existing data. - * `error`: Throw an exception if data already exists. - * `ignore`: Silently ignore this operation if data already exists. + * ``append``: Append contents of this :class:`DataFrame` to existing data. + * ``overwrite``: Overwrite existing data. + * ``ignore``: Silently ignore this operation if data already exists. + * ``error`` (default case): Throw an exception if data already exists. - :param path: the path in any Hadoop supported file system - :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error) + >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ - return self._jwrite.mode(mode).parquet(path) + self._jwrite.mode(mode).parquet(path) @since(1.4) def jdbc(self, url, table, mode="error", properties={}): - """ - Saves the content of the :class:`DataFrame` to a external database table - via JDBC. - - In the case the table already exists in the external database, - behavior of this function depends on the save mode, specified by the `mode` - function (default to throwing an exception). There are four modes: + """Saves the content of the :class:`DataFrame` to a external database table via JDBC. - * `append`: Append contents of this :class:`DataFrame` to existing data. - * `overwrite`: Overwrite existing data. - * `error`: Throw an exception if data already exists. - * `ignore`: Silently ignore this operation if data already exists. + .. note:: Don't create too many partitions in parallel on a large cluster;\ + otherwise Spark might crash your external database systems. - :param url: a JDBC URL of the form `jdbc:subprotocol:subname` + :param url: a JDBC URL of the form ``jdbc:subprotocol:subname`` :param table: Name of the table in the external database. - :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error) + :param mode: specifies the behavior of the save operation when data already exists. + + * ``append``: Append contents of this :class:`DataFrame` to existing data. + * ``overwrite``: Overwrite existing data. + * ``ignore``: Silently ignore this operation if data already exists. + * ``error`` (default case): Throw an exception if data already exists. :param properties: JDBC database connection arguments, a list of - arbitrary string tag/value. Normally at least a - "user" and "password" property should be included. + arbitrary string tag/value. Normally at least a + "user" and "password" property should be included. """ jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)() for k in properties: @@ -398,24 +378,23 @@ class DataFrameWriter(object): def _test(): import doctest + import os + import tempfile from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext import pyspark.sql.readwriter + + os.chdir(os.environ["SPARK_HOME"]) + globs = pyspark.sql.readwriter.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') + + globs['tempfile'] = tempfile + globs['os'] = os globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) - globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \ - .toDF(StructType([StructField('age', IntegerType()), - StructField('name', StringType())])) - jsonStrings = [ - '{"field1": 1, "field2": "row1", "field3":{"field4":11}}', - '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},' - '"field6":[{"field7": "row2"}]}', - '{"field1" : null, "field2": "row3", ' - '"field3":{"field4":33, "field5": []}}' - ] - globs['jsonStrings'] = jsonStrings + globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned') + (failure_count, test_count) = doctest.testmod( pyspark.sql.readwriter, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 76384d31f1..6e498f0af0 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -753,8 +753,10 @@ class HiveContextSQLTests(ReusedPySparkTestCase): try: cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf() except py4j.protocol.Py4JError: + cls.tearDownClass() raise unittest.SkipTest("Hive is not available") except TypeError: + cls.tearDownClass() raise unittest.SkipTest("Hive is not available") os.unlink(cls.tempdir.name) _scala_HiveContext =\ diff --git a/python/test_support/sql/parquet_partitioned/_SUCCESS b/python/test_support/sql/parquet_partitioned/_SUCCESS new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/test_support/sql/parquet_partitioned/_common_metadata b/python/test_support/sql/parquet_partitioned/_common_metadata new file mode 100644 index 0000000000000000000000000000000000000000..7ef2320651dee5f2a4d588ecf8d0b281a2bc3018 GIT binary patch literal 210 zcmYk1!3u&v5QYcw=+#i_AOju(TauuIw{9J!W6@#L&7^f#ch@4s*Xz03qM*|Z%=dr8 zpKo@l?}W+LRZ<$?0pE+Az!kJ%F~9^uFPsH)sVYKST3i^>Emc>dJ5KD<^~?|@@1$Xd zmekN-KcIQE3^UY5^@YI%&o$$v#_TZQTWe3Bk^F(Rs4OUY&gqF;!bVwwKPhIzI37m` brr(!~MnyNKbS*`ck~LYXVg*kC$ZeY!e^o%_ literal 0 HcmV?d00001 diff --git a/python/test_support/sql/parquet_partitioned/_metadata b/python/test_support/sql/parquet_partitioned/_metadata new file mode 100644 index 0000000000000000000000000000000000000000..78a1ca7d38279147e00a826e627f7514d988bd8e GIT binary patch literal 743 zcmb7?&riZI6vyjCA%^RgE^;7EFfke3h76U+ftwc+!pZpKP`3&TWgA`5Oyuq#;D6^o zW7!Xai3#`)eXs57`{ea~hy9VQD!Or7;$bLM1*p}A0!smz(FOq8iT<e9pqWs@do9au zo3k(wlis!Ik)&sv5#gfAo0haIJbuS=KVMLxRcdNgcaA|t&q}E!P0{YOkF&}RJnS<+ zT{Iv~o+>~h>;mEB2-`{-EoU3j+6jrYuY)zEJn-EKp==XmwCF#y_WraHO@felu$%|` z(K_3`IXh{d_L=r}G$4ZdFmoBn%lg_3s`$k}26efUv-!gz5!`pDu$%|Kx;hW}7?X&& z6N+Ow_$iL(tWW^v;TxV&K|CS|yk8=bL=<&VEcn6|$UrYXWnPTB4@<Pxn!HM#v6bit zW0@E%7$eV2X2_@1Kt)m1U9MJ7D*#V((KTnh{z`f5he6%O9c*#;0(>g~45h?>03XD` A@&Et; literal 0 HcmV?d00001 diff --git a/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/.part-r-00008.gz.parquet.crc b/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/.part-r-00008.gz.parquet.crc new file mode 100644 index 0000000000000000000000000000000000000000..e93f42ed6f35018df1ae76c9746b26bd299c0339 GIT binary patch literal 12 TcmYc;N@ieSU}9KgSR(=e5vKy4 literal 0 HcmV?d00001 diff --git a/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/part-r-00008.gz.parquet b/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/part-r-00008.gz.parquet new file mode 100644 index 0000000000000000000000000000000000000000..461c382937ecdf13db1003bd2bf8c9a79c510d3e GIT binary patch literal 322 zcmWG=3^EjD5S0?O(-CC?GT1~pWF(j!b27n%7y}T<cXNO!AV^M1NJvOaVVl%vIg?@i z`I+jxKtWL^NgWTCqWt_4Q6>ga9#tj@mb}E=R8<BBF(yeFNf|W`8BrcdQ#J+;Nr;Rn zix`twgjkIlhYgxdKv5<JNf`;v{GxQd#Dc`+j8whig2bY1z2d?gJs_j7G_^#pD8F1U zH?<@&C9xz?BC1-cq_QAY$x5lXq^LBxL`g>}Ei*MIrC7--x>^b35TF#8(m_&~nU@Y! zm{*#UlbDnPQ~}hQs-pxmRLQEkwl=nwK|&g8rEYGKLRo52ab|v=f}x(7o<UKvF~bj_ J-vfZ52LMLQQ}O@+ literal 0 HcmV?d00001 diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00002.gz.parquet.crc b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00002.gz.parquet.crc new file mode 100644 index 0000000000000000000000000000000000000000..b63c4d6d1e1dc71969b22cdd24ccd4f0673b22c0 GIT binary patch literal 12 TcmYc;N@ieSU}6Z*|K$w;5@-WP literal 0 HcmV?d00001 diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00004.gz.parquet.crc b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00004.gz.parquet.crc new file mode 100644 index 0000000000000000000000000000000000000000..5bc0ebd7135638686a8e064ac02fa673b06228d4 GIT binary patch literal 12 TcmYc;N@ieSU}D(K7dIOK5$yu~ literal 0 HcmV?d00001 diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00002.gz.parquet b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00002.gz.parquet new file mode 100644 index 0000000000000000000000000000000000000000..62a63915beac2840b31ae7f1e8dec1c26b268b89 GIT binary patch literal 343 zcmWG=3^EjD5LFU&(-CC?GT1~pWF**qGm|qCQ*+=9F$N%z@8$qeK#-i2kdTm;!aix= z%#)IxD=isX?px~$0wqM5By~Jkit_VIM41>wc~qGsSn?8cQ&kxl#F!*yBxTe%WJGx+ zP1zVYBq1`QEMiPz1!7Ye)i`Y6w!s--Yk|^C43aVun)yZPdWi*z$r-77#RZ8)*?Pr= zIeI`wVQFfKUQvFzUT$hhVoG93qC`}+Qb}b&s*;sbaY<2Wa*2|TQd(wePD-(oRdlry z$VEUYFr|Z}Ff%V5s4%ZICnqr}2dDz5HC0CmW~h=?b!}~IErWzK)JomlB89TlqT<Z_ XJOx8NGd+W%WMc+KMh1oeV2}a;7wTKD literal 0 HcmV?d00001 diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00004.gz.parquet b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00004.gz.parquet new file mode 100644 index 0000000000000000000000000000000000000000..67665a7b55da6413d2a534d6f46061e380f70b2e GIT binary patch literal 343 zcmYjN!AiqG6x=MK&7qebHe@kiS!kgU7mFwu^w5iVEIoKoglv;-4Q{gOW}}3VFYqS% z5q^(f<IQc=;9O>4=Dm5m#e6wK01<Q*AZSDnIlk9l@;a9y>4Xam4qu!d8N{n7iq=X0 zb^h^Qf1OXk&E@BCXbc2#aBV9oHG%*Q#?Z5KmhmwFF2p|eCytK>__PNc{No_og>K=# zSrg}?YwN_m*4PkW-<wLcp{!u>1E*!d)FUmof*P@{xTZ=z(~N7DFwMN%hUmKBBqXI) zRjf%s)+rZBNy58^>@G6ao`QeDG~bwDUJ1cg!X(Tn56ItA5;kpn-vaO8xAG`cqbIJ) UROX`@J)_4eJ^_{mz{0%r8w^idvH$=8 literal 0 HcmV?d00001 diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/.part-r-00005.gz.parquet.crc b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/.part-r-00005.gz.parquet.crc new file mode 100644 index 0000000000000000000000000000000000000000..ae94a15d08c816a6fb60117395462aa640da5026 GIT binary patch literal 12 TcmYc;N@ieSU}AV;?~?@p63+t^ literal 0 HcmV?d00001 diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/part-r-00005.gz.parquet b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/part-r-00005.gz.parquet new file mode 100644 index 0000000000000000000000000000000000000000..6cb8538aa890475f9128c8025e2e9b0f72b1b785 GIT binary patch literal 333 zcmZut%SyvQ6di}O1}V6jFoVICffR%|SVR}Wjjr4X#ib%-noMhO^5|ruNXTAs=gv>? zC;T(PTdm;2vpI)*&V6vFr<W6g$Syex2?o(8u1|Xx(yDAt9s&$s+!X<mSscgnwD<b) z9K64wt!i_)4P}B^c<z|8v;-qE2rzXDnX3RH4>O<nW7j7GcGO2M`*DclVK4QbHpcGB zKMTRai1~D{Gz%dsncON-SJK>xdZ`77WuvSx<%7tTm8rCnUbWmlR*FZwwx&re5BWS( zI<0wh-SX8nV0}~gCzurr2o{aja;6~xtt#ZdLwVG8-A#w+&U)p3ZbtXY)LB`KCgNBe NnB)+B!ULx8$S(}ES498- literal 0 HcmV?d00001 diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/.part-r-00007.gz.parquet.crc b/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/.part-r-00007.gz.parquet.crc new file mode 100644 index 0000000000000000000000000000000000000000..58d9bb5fc5883f0b65b637a8deea2672b9fee8a2 GIT binary patch literal 12 TcmYc;N@ieSU}C7hE>;2n5}^Yd literal 0 HcmV?d00001 diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/part-r-00007.gz.parquet b/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/part-r-00007.gz.parquet new file mode 100644 index 0000000000000000000000000000000000000000..9b00805481e7b311763d48ff965966b4d09daa99 GIT binary patch literal 343 zcmYjN%SyvQ6ulWjn?*Mw6EYaE45TQ;!6Lc{g1Asz2A7JEX)>*$d37>TB;*6cow)In z`~?5Ok8oNwcsA$2IrpB+4bQKq7%;_`K1Ny$u;n_#kSm$S%U;-^vHN1JNh6*`Q8Z76 zug0@?@&54%+h==UTiU>g_*bSZON9~Ok%t_!;JNSsY(!k*PAnIX$ngLy^5bCBMs{Vt z858TYZ|lXTR@(@O>+F|u!Fa{vd%^08%O$H<8Pj6b2*qUi$a0~0!WDOJTB@EZK?7PV z*~E(abe@VVscCTA()C5!+K~S*m=+5iESfCivrH%SsPO6EQW~^fch`Zl^ILh4%khJd Vby^nVDLY|@GCl&s00{L<zX6fLTR{K- literal 0 HcmV?d00001 diff --git a/python/test_support/sql/people.json b/python/test_support/sql/people.json new file mode 100644 index 0000000000..50a859cbd7 --- /dev/null +++ b/python/test_support/sql/people.json @@ -0,0 +1,3 @@ +{"name":"Michael"} +{"name":"Andy", "age":30} +{"name":"Justin", "age":19} -- GitLab