diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py index 634c575ecd80e1bc94f169cea85d4594b5e26561..66b0bff2908b7be5f0b738a626dd578b6ef2a78a 100644 --- a/python/pyspark/sql/__init__.py +++ b/python/pyspark/sql/__init__.py @@ -41,6 +41,13 @@ Important classes of Spark SQL and DataFrames: """ from __future__ import absolute_import + +def since(version): + def deco(f): + f.__doc__ = f.__doc__.rstrip() + "\n\n.. versionadded:: %s" % version + return f + return deco + # fix the module name conflict for Python 3+ import sys from . import _types as types diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index fc7ad674daa5bf793ed915381f4068f59ff9c843..d03bb6d33dd03389c4294a9462c69b412de825c1 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -23,6 +23,7 @@ if sys.version >= '3': from pyspark.context import SparkContext from pyspark.rdd import ignore_unicode_prefix +from pyspark.sql import since from pyspark.sql.types import * __all__ = ["DataFrame", "Column", "SchemaRDD", "DataFrameNaFunctions", @@ -114,6 +115,8 @@ class Column(object): # 2. Create from an expression df.colName + 1 1 / df.colName + + .. versionadded:: 1.3 """ def __init__(self, jc): @@ -159,6 +162,7 @@ class Column(object): bitwiseAND = _bin_op("bitwiseAND") bitwiseXOR = _bin_op("bitwiseXOR") + @since(1.3) def getItem(self, key): """An expression that gets an item at position `ordinal` out of a list, or gets an item by key out of a dict. @@ -179,6 +183,7 @@ class Column(object): """ return self[key] + @since(1.3) def getField(self, name): """An expression that gets a field by name in a StructField. @@ -211,6 +216,7 @@ class Column(object): endswith = _bin_op("endsWith") @ignore_unicode_prefix + @since(1.3) def substr(self, startPos, length): """ Return a :class:`Column` which is a substring of the column @@ -234,6 +240,7 @@ class Column(object): __getslice__ = substr @ignore_unicode_prefix + @since(1.3) def inSet(self, *cols): """ A boolean expression that is evaluated to true if the value of this expression is contained by the evaluated values of the arguments. @@ -259,6 +266,7 @@ class Column(object): isNull = _unary_op("isNull", "True if the current expression is null.") isNotNull = _unary_op("isNotNull", "True if the current expression is not null.") + @since(1.3) def alias(self, *alias): """Returns this column aliased with a new name or names (in the case of expressions that return more than one column, such as explode). @@ -274,6 +282,7 @@ class Column(object): return Column(getattr(self._jc, "as")(_to_seq(sc, list(alias)))) @ignore_unicode_prefix + @since(1.3) def cast(self, dataType): """ Convert the column into type `dataType` @@ -294,6 +303,7 @@ class Column(object): return Column(jc) @ignore_unicode_prefix + @since(1.3) def between(self, lowerBound, upperBound): """ A boolean expression that is evaluated to true if the value of this expression is between the given columns. @@ -301,6 +311,7 @@ class Column(object): return (self >= lowerBound) & (self <= upperBound) @ignore_unicode_prefix + @since(1.4) def when(self, condition, value): """Evaluates a list of conditions and returns one of multiple possible result expressions. If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions. @@ -319,6 +330,7 @@ class Column(object): return Column(jc) @ignore_unicode_prefix + @since(1.4) def otherwise(self, value): """Evaluates a list of conditions and returns one of multiple possible result expressions. If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions. diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 7543475014bd2e6951cdf336d712a0eaa037ab2a..51f12c5bb41982a56499ab5fa0c964ba20387ef1 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -28,6 +28,7 @@ from py4j.protocol import Py4JError from pyspark.rdd import RDD, _prepare_for_python_RDD, ignore_unicode_prefix from pyspark.serializers import AutoBatchedSerializer, PickleSerializer +from pyspark.sql import since from pyspark.sql.types import Row, StringType, StructType, _verify_type, \ _infer_schema, _has_nulltype, _merge_type, _create_converter, _python_to_sql_converter from pyspark.sql.dataframe import DataFrame @@ -106,11 +107,13 @@ class SQLContext(object): self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc()) return self._scala_SQLContext + @since(1.3) def setConf(self, key, value): """Sets the given Spark SQL configuration property. """ self._ssql_ctx.setConf(key, value) + @since(1.3) def getConf(self, key, defaultValue): """Returns the value of Spark SQL configuration property for the given key. @@ -119,10 +122,12 @@ class SQLContext(object): return self._ssql_ctx.getConf(key, defaultValue) @property + @since("1.3.1") def udf(self): """Returns a :class:`UDFRegistration` for UDF registration.""" return UDFRegistration(self) + @since(1.4) def range(self, start, end, step=1, numPartitions=None): """ Create a :class:`DataFrame` with single LongType column named `id`, @@ -144,6 +149,7 @@ class SQLContext(object): return DataFrame(jdf, self) @ignore_unicode_prefix + @since(1.2) def registerFunction(self, name, f, returnType=StringType()): """Registers a lambda function as a UDF so it can be used in SQL statements. @@ -210,7 +216,8 @@ class SQLContext(object): @ignore_unicode_prefix def inferSchema(self, rdd, samplingRatio=None): - """::note: Deprecated in 1.3, use :func:`createDataFrame` instead. + """ + .. note:: Deprecated in 1.3, use :func:`createDataFrame` instead. """ warnings.warn("inferSchema is deprecated, please use createDataFrame instead") @@ -221,7 +228,8 @@ class SQLContext(object): @ignore_unicode_prefix def applySchema(self, rdd, schema): - """::note: Deprecated in 1.3, use :func:`createDataFrame` instead. + """ + .. note:: Deprecated in 1.3, use :func:`createDataFrame` instead. """ warnings.warn("applySchema is deprecated, please use createDataFrame instead") @@ -233,6 +241,7 @@ class SQLContext(object): return self.createDataFrame(rdd, schema) + @since(1.3) @ignore_unicode_prefix def createDataFrame(self, data, schema=None, samplingRatio=None): """ @@ -337,6 +346,7 @@ class SQLContext(object): df = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json()) return DataFrame(df, self) + @since(1.3) def registerDataFrameAsTable(self, df, tableName): """Registers the given :class:`DataFrame` as a temporary table in the catalog. @@ -349,6 +359,7 @@ class SQLContext(object): else: raise ValueError("Can only register DataFrame as table") + @since(1.0) def parquetFile(self, *paths): """Loads a Parquet file, returning the result as a :class:`DataFrame`. @@ -367,6 +378,7 @@ class SQLContext(object): jdf = self._ssql_ctx.parquetFile(jpaths) return DataFrame(jdf, self) + @since(1.0) def jsonFile(self, path, schema=None, samplingRatio=1.0): """Loads a text file storing one JSON object per line as a :class:`DataFrame`. @@ -407,6 +419,7 @@ class SQLContext(object): return DataFrame(df, self) @ignore_unicode_prefix + @since(1.0) def jsonRDD(self, rdd, schema=None, samplingRatio=1.0): """Loads an RDD storing one JSON object per string as a :class:`DataFrame`. @@ -449,6 +462,7 @@ class SQLContext(object): df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype) return DataFrame(df, self) + @since(1.3) def load(self, path=None, source=None, schema=None, **options): """Returns the dataset in a data source as a :class:`DataFrame`. @@ -460,6 +474,7 @@ class SQLContext(object): """ return self.read.load(path, source, schema, **options) + @since(1.3) def createExternalTable(self, tableName, path=None, source=None, schema=None, **options): """Creates an external table based on the dataset in a data source. @@ -489,6 +504,7 @@ class SQLContext(object): return DataFrame(df, self) @ignore_unicode_prefix + @since(1.0) def sql(self, sqlQuery): """Returns a :class:`DataFrame` representing the result of the given query. @@ -499,6 +515,7 @@ class SQLContext(object): """ return DataFrame(self._ssql_ctx.sql(sqlQuery), self) + @since(1.0) def table(self, tableName): """Returns the specified table as a :class:`DataFrame`. @@ -510,6 +527,7 @@ class SQLContext(object): return DataFrame(self._ssql_ctx.table(tableName), self) @ignore_unicode_prefix + @since(1.3) def tables(self, dbName=None): """Returns a :class:`DataFrame` containing names of tables in the given database. @@ -528,6 +546,7 @@ class SQLContext(object): else: return DataFrame(self._ssql_ctx.tables(dbName), self) + @since(1.3) def tableNames(self, dbName=None): """Returns a list of names of tables in the database ``dbName``. @@ -544,25 +563,29 @@ class SQLContext(object): else: return [name for name in self._ssql_ctx.tableNames(dbName)] + @since(1.0) def cacheTable(self, tableName): """Caches the specified table in-memory.""" self._ssql_ctx.cacheTable(tableName) + @since(1.0) def uncacheTable(self, tableName): """Removes the specified table from the in-memory cache.""" self._ssql_ctx.uncacheTable(tableName) + @since(1.3) def clearCache(self): """Removes all cached tables from the in-memory cache. """ self._ssql_ctx.clearCache() @property + @since(1.4) def read(self): """ Returns a :class:`DataFrameReader` that can be used to read data in as a :class:`DataFrame`. - ::note: Experimental + .. note:: Experimental >>> sqlContext.read <pyspark.sql.readwriter.DataFrameReader object at ...> diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index f2280b5100e53172ee082d6af4d769393741adba..3fc7d0048edf6924202294132aa2c8d3804ba915 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -29,6 +29,7 @@ from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync +from pyspark.sql import since from pyspark.sql.types import _create_cls, _parse_datatype_json_string from pyspark.sql.column import Column, _to_seq, _to_java_column from pyspark.sql.readwriter import DataFrameWriter @@ -60,6 +61,8 @@ class DataFrame(object): people.filter(people.age > 30).join(department, people.deptId == department.id)) \ .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"}) + + .. versionadded:: 1.3 """ def __init__(self, jdf, sql_ctx): @@ -71,6 +74,7 @@ class DataFrame(object): self._lazy_rdd = None @property + @since(1.3) def rdd(self): """Returns the content as an :class:`pyspark.RDD` of :class:`Row`. """ @@ -88,18 +92,21 @@ class DataFrame(object): return self._lazy_rdd @property + @since("1.3.1") def na(self): """Returns a :class:`DataFrameNaFunctions` for handling missing values. """ return DataFrameNaFunctions(self) @property + @since(1.4) def stat(self): """Returns a :class:`DataFrameStatFunctions` for statistic functions. """ return DataFrameStatFunctions(self) @ignore_unicode_prefix + @since(1.3) def toJSON(self, use_unicode=True): """Converts a :class:`DataFrame` into a :class:`RDD` of string. @@ -111,6 +118,7 @@ class DataFrame(object): rdd = self._jdf.toJSON() return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode)) + @since(1.3) def saveAsParquetFile(self, path): """Saves the contents as a Parquet file, preserving the schema. @@ -127,6 +135,7 @@ class DataFrame(object): """ self._jdf.saveAsParquetFile(path) + @since(1.3) def registerTempTable(self, name): """Registers this RDD as a temporary table using the given name. @@ -140,11 +149,13 @@ class DataFrame(object): """ self._jdf.registerTempTable(name) + @since(1.3) def registerAsTable(self, name): """DEPRECATED: use :func:`registerTempTable` instead""" warnings.warn("Use registerTempTable instead of registerAsTable.", DeprecationWarning) self.registerTempTable(name) + @since(1.3) def insertInto(self, tableName, overwrite=False): """Inserts the contents of this :class:`DataFrame` into the specified table. @@ -152,6 +163,7 @@ class DataFrame(object): """ self._jdf.insertInto(tableName, overwrite) + @since(1.3) def saveAsTable(self, tableName, source=None, mode="error", **options): """Saves the contents of this :class:`DataFrame` to a data source as a table. @@ -169,6 +181,7 @@ class DataFrame(object): """ self.write.saveAsTable(tableName, source, mode, **options) + @since(1.3) def save(self, path=None, source=None, mode="error", **options): """Saves the contents of the :class:`DataFrame` to a data source. @@ -187,6 +200,7 @@ class DataFrame(object): return self.write.save(path, source, mode, **options) @property + @since(1.4) def write(self): """ Interface for saving the content of the :class:`DataFrame` out @@ -194,7 +208,7 @@ class DataFrame(object): :return :class:`DataFrameWriter` - ::note: Experimental + .. note:: Experimental >>> df.write <pyspark.sql.readwriter.DataFrameWriter object at ...> @@ -202,6 +216,7 @@ class DataFrame(object): return DataFrameWriter(self) @property + @since(1.3) def schema(self): """Returns the schema of this :class:`DataFrame` as a :class:`types.StructType`. @@ -212,6 +227,7 @@ class DataFrame(object): self._schema = _parse_datatype_json_string(self._jdf.schema().json()) return self._schema + @since(1.3) def printSchema(self): """Prints out the schema in the tree format. @@ -223,6 +239,7 @@ class DataFrame(object): """ print(self._jdf.schema().treeString()) + @since(1.3) def explain(self, extended=False): """Prints the (logical and physical) plans to the console for debugging purpose. @@ -248,12 +265,14 @@ class DataFrame(object): else: print(self._jdf.queryExecution().executedPlan().toString()) + @since(1.3) def isLocal(self): """Returns ``True`` if the :func:`collect` and :func:`take` methods can be run locally (without any Spark executors). """ return self._jdf.isLocal() + @since(1.3) def show(self, n=20): """Prints the first ``n`` rows to the console. @@ -272,6 +291,7 @@ class DataFrame(object): def __repr__(self): return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) + @since(1.3) def count(self): """Returns the number of rows in this :class:`DataFrame`. @@ -281,6 +301,7 @@ class DataFrame(object): return int(self._jdf.count()) @ignore_unicode_prefix + @since(1.3) def collect(self): """Returns all the records as a list of :class:`Row`. @@ -294,6 +315,7 @@ class DataFrame(object): return [cls(r) for r in rs] @ignore_unicode_prefix + @since(1.3) def limit(self, num): """Limits the result count to the number specified. @@ -306,6 +328,7 @@ class DataFrame(object): return DataFrame(jdf, self.sql_ctx) @ignore_unicode_prefix + @since(1.3) def take(self, num): """Returns the first ``num`` rows as a :class:`list` of :class:`Row`. @@ -315,6 +338,7 @@ class DataFrame(object): return self.limit(num).collect() @ignore_unicode_prefix + @since(1.3) def map(self, f): """ Returns a new :class:`RDD` by applying a the ``f`` function to each :class:`Row`. @@ -326,6 +350,7 @@ class DataFrame(object): return self.rdd.map(f) @ignore_unicode_prefix + @since(1.3) def flatMap(self, f): """ Returns a new :class:`RDD` by first applying the ``f`` function to each :class:`Row`, and then flattening the results. @@ -337,6 +362,7 @@ class DataFrame(object): """ return self.rdd.flatMap(f) + @since(1.3) def mapPartitions(self, f, preservesPartitioning=False): """Returns a new :class:`RDD` by applying the ``f`` function to each partition. @@ -349,6 +375,7 @@ class DataFrame(object): """ return self.rdd.mapPartitions(f, preservesPartitioning) + @since(1.3) def foreach(self, f): """Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`. @@ -360,6 +387,7 @@ class DataFrame(object): """ return self.rdd.foreach(f) + @since(1.3) def foreachPartition(self, f): """Applies the ``f`` function to each partition of this :class:`DataFrame`. @@ -372,6 +400,7 @@ class DataFrame(object): """ return self.rdd.foreachPartition(f) + @since(1.3) def cache(self): """ Persists with the default storage level (C{MEMORY_ONLY_SER}). """ @@ -379,6 +408,7 @@ class DataFrame(object): self._jdf.cache() return self + @since(1.3) def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER): """Sets the storage level to persist its values across operations after the first time it is computed. This can only be used to assign @@ -390,6 +420,7 @@ class DataFrame(object): self._jdf.persist(javaStorageLevel) return self + @since(1.3) def unpersist(self, blocking=True): """Marks the :class:`DataFrame` as non-persistent, and remove all blocks for it from memory and disk. @@ -398,6 +429,7 @@ class DataFrame(object): self._jdf.unpersist(blocking) return self + @since(1.4) def coalesce(self, numPartitions): """ Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions. @@ -412,6 +444,7 @@ class DataFrame(object): """ return DataFrame(self._jdf.coalesce(numPartitions), self.sql_ctx) + @since(1.3) def repartition(self, numPartitions): """Returns a new :class:`DataFrame` that has exactly ``numPartitions`` partitions. @@ -420,6 +453,7 @@ class DataFrame(object): """ return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx) + @since(1.3) def distinct(self): """Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`. @@ -428,6 +462,7 @@ class DataFrame(object): """ return DataFrame(self._jdf.distinct(), self.sql_ctx) + @since(1.3) def sample(self, withReplacement, fraction, seed=None): """Returns a sampled subset of this :class:`DataFrame`. @@ -439,6 +474,7 @@ class DataFrame(object): rdd = self._jdf.sample(withReplacement, fraction, long(seed)) return DataFrame(rdd, self.sql_ctx) + @since(1.4) def randomSplit(self, weights, seed=None): """Randomly splits this :class:`DataFrame` with the provided weights. @@ -461,6 +497,7 @@ class DataFrame(object): return [DataFrame(rdd, self.sql_ctx) for rdd in rdd_array] @property + @since(1.3) def dtypes(self): """Returns all column names and their data types as a list. @@ -471,6 +508,7 @@ class DataFrame(object): @property @ignore_unicode_prefix + @since(1.3) def columns(self): """Returns all column names as a list. @@ -480,6 +518,7 @@ class DataFrame(object): return [f.name for f in self.schema.fields] @ignore_unicode_prefix + @since(1.3) def alias(self, alias): """Returns a new :class:`DataFrame` with an alias set. @@ -494,6 +533,7 @@ class DataFrame(object): return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx) @ignore_unicode_prefix + @since(1.3) def join(self, other, joinExprs=None, joinType=None): """Joins with another :class:`DataFrame`, using the given join expression. @@ -527,6 +567,7 @@ class DataFrame(object): return DataFrame(jdf, self.sql_ctx) @ignore_unicode_prefix + @since(1.3) def sort(self, *cols, **kwargs): """Returns a new :class:`DataFrame` sorted by the specified column(s). @@ -586,6 +627,7 @@ class DataFrame(object): cols = cols[0] return self._jseq(cols, _to_java_column) + @since("1.3.1") def describe(self, *cols): """Computes statistics for numeric columns. @@ -607,6 +649,7 @@ class DataFrame(object): return DataFrame(jdf, self.sql_ctx) @ignore_unicode_prefix + @since(1.3) def head(self, n=None): """ Returns the first ``n`` rows as a list of :class:`Row`, @@ -623,6 +666,7 @@ class DataFrame(object): return self.take(n) @ignore_unicode_prefix + @since(1.3) def first(self): """Returns the first row as a :class:`Row`. @@ -632,6 +676,7 @@ class DataFrame(object): return self.head() @ignore_unicode_prefix + @since(1.3) def __getitem__(self, item): """Returns the column as a :class:`Column`. @@ -659,6 +704,7 @@ class DataFrame(object): else: raise TypeError("unexpected item type: %s" % type(item)) + @since(1.3) def __getattr__(self, name): """Returns the :class:`Column` denoted by ``name``. @@ -672,6 +718,7 @@ class DataFrame(object): return Column(jc) @ignore_unicode_prefix + @since(1.3) def select(self, *cols): """Projects a set of expressions and returns a new :class:`DataFrame`. @@ -689,6 +736,7 @@ class DataFrame(object): jdf = self._jdf.select(self._jcols(*cols)) return DataFrame(jdf, self.sql_ctx) + @since(1.3) def selectExpr(self, *expr): """Projects a set of SQL expressions and returns a new :class:`DataFrame`. @@ -703,6 +751,7 @@ class DataFrame(object): return DataFrame(jdf, self.sql_ctx) @ignore_unicode_prefix + @since(1.3) def filter(self, condition): """Filters rows using the given condition. @@ -732,6 +781,7 @@ class DataFrame(object): where = filter @ignore_unicode_prefix + @since(1.3) def groupBy(self, *cols): """Groups the :class:`DataFrame` using the specified columns, so we can run aggregation on them. See :class:`GroupedData` @@ -755,6 +805,7 @@ class DataFrame(object): from pyspark.sql.group import GroupedData return GroupedData(jdf, self.sql_ctx) + @since(1.3) def agg(self, *exprs): """ Aggregate on the entire :class:`DataFrame` without groups (shorthand for ``df.groupBy.agg()``). @@ -767,6 +818,7 @@ class DataFrame(object): """ return self.groupBy().agg(*exprs) + @since(1.3) def unionAll(self, other): """ Return a new :class:`DataFrame` containing union of rows in this frame and another frame. @@ -775,6 +827,7 @@ class DataFrame(object): """ return DataFrame(self._jdf.unionAll(other._jdf), self.sql_ctx) + @since(1.3) def intersect(self, other): """ Return a new :class:`DataFrame` containing rows only in both this frame and another frame. @@ -783,6 +836,7 @@ class DataFrame(object): """ return DataFrame(self._jdf.intersect(other._jdf), self.sql_ctx) + @since(1.3) def subtract(self, other): """ Return a new :class:`DataFrame` containing rows in this frame but not in another frame. @@ -791,6 +845,7 @@ class DataFrame(object): """ return DataFrame(getattr(self._jdf, "except")(other._jdf), self.sql_ctx) + @since(1.4) def dropDuplicates(self, subset=None): """Return a new :class:`DataFrame` with duplicate rows removed, optionally only considering certain columns. @@ -821,6 +876,7 @@ class DataFrame(object): jdf = self._jdf.dropDuplicates(self._jseq(subset)) return DataFrame(jdf, self.sql_ctx) + @since("1.3.1") def dropna(self, how='any', thresh=None, subset=None): """Returns a new :class:`DataFrame` omitting rows with null values. @@ -863,6 +919,7 @@ class DataFrame(object): return DataFrame(self._jdf.na().drop(thresh, self._jseq(subset)), self.sql_ctx) + @since("1.3.1") def fillna(self, value, subset=None): """Replace null values, alias for ``na.fill()``. @@ -924,6 +981,7 @@ class DataFrame(object): return DataFrame(self._jdf.na().fill(value, self._jseq(subset)), self.sql_ctx) + @since(1.4) def replace(self, to_replace, value, subset=None): """Returns a new :class:`DataFrame` replacing a value with another value. @@ -999,6 +1057,7 @@ class DataFrame(object): return DataFrame( self._jdf.na().replace(self._jseq(subset), self._jmap(rep_dict)), self.sql_ctx) + @since(1.4) def corr(self, col1, col2, method=None): """ Calculates the correlation of two columns of a DataFrame as a double value. Currently only @@ -1020,6 +1079,7 @@ class DataFrame(object): "coefficient is supported.") return self._jdf.stat().corr(col1, col2, method) + @since(1.4) def cov(self, col1, col2): """ Calculate the sample covariance for the given columns, specified by their names, as a @@ -1034,6 +1094,7 @@ class DataFrame(object): raise ValueError("col2 should be a string.") return self._jdf.stat().cov(col1, col2) + @since(1.4) def crosstab(self, col1, col2): """ Computes a pair-wise frequency table of the given columns. Also known as a contingency @@ -1055,6 +1116,7 @@ class DataFrame(object): raise ValueError("col2 should be a string.") return DataFrame(self._jdf.stat().crosstab(col1, col2), self.sql_ctx) + @since(1.4) def freqItems(self, cols, support=None): """ Finding frequent items for columns, possibly with false positives. Using the @@ -1076,6 +1138,7 @@ class DataFrame(object): return DataFrame(self._jdf.stat().freqItems(_to_seq(self._sc, cols), support), self.sql_ctx) @ignore_unicode_prefix + @since(1.3) def withColumn(self, colName, col): """Returns a new :class:`DataFrame` by adding a column. @@ -1088,6 +1151,7 @@ class DataFrame(object): return self.select('*', col.alias(colName)) @ignore_unicode_prefix + @since(1.3) def withColumnRenamed(self, existing, new): """Returns a new :class:`DataFrame` by renaming an existing column. @@ -1102,6 +1166,7 @@ class DataFrame(object): for c in self.columns] return self.select(*cols) + @since(1.4) @ignore_unicode_prefix def drop(self, colName): """Returns a new :class:`DataFrame` that drops the specified column. @@ -1114,6 +1179,7 @@ class DataFrame(object): jdf = self._jdf.drop(colName) return DataFrame(jdf, self.sql_ctx) + @since(1.3) def toPandas(self): """Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``. diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index fbe9bf5b526af233ae7b3a8bf8a3300dc65ac3bd..9b0d7f3e6656eae89c324529dba79c0e1e428cdc 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -26,6 +26,7 @@ if sys.version < "3": from pyspark import SparkContext from pyspark.rdd import _prepare_for_python_RDD, ignore_unicode_prefix from pyspark.serializers import PickleSerializer, AutoBatchedSerializer +from pyspark.sql import since from pyspark.sql.types import StringType from pyspark.sql.column import Column, _to_java_column, _to_seq @@ -78,6 +79,18 @@ _functions = { 'sqrt': 'Computes the square root of the specified float value.', 'abs': 'Computes the absolute value.', + 'max': 'Aggregate function: returns the maximum value of the expression in a group.', + 'min': 'Aggregate function: returns the minimum value of the expression in a group.', + 'first': 'Aggregate function: returns the first value in a group.', + 'last': 'Aggregate function: returns the last value in a group.', + 'count': 'Aggregate function: returns the number of items in a group.', + 'sum': 'Aggregate function: returns the sum of all values in the expression.', + 'avg': 'Aggregate function: returns the average of the values in a group.', + 'mean': 'Aggregate function: returns the average of the values in a group.', + 'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.', +} + +_functions_1_4 = { # unary math functions 'acos': 'Computes the cosine inverse of the given value; the returned angle is in the range' + '0.0 through pi.', @@ -102,21 +115,11 @@ _functions = { 'tan': 'Computes the tangent of the given value.', 'tanh': 'Computes the hyperbolic tangent of the given value.', 'toDegrees': 'Converts an angle measured in radians to an approximately equivalent angle ' + - 'measured in degrees.', + 'measured in degrees.', 'toRadians': 'Converts an angle measured in degrees to an approximately equivalent angle ' + - 'measured in radians.', + 'measured in radians.', 'bitwiseNOT': 'Computes bitwise not.', - - 'max': 'Aggregate function: returns the maximum value of the expression in a group.', - 'min': 'Aggregate function: returns the minimum value of the expression in a group.', - 'first': 'Aggregate function: returns the first value in a group.', - 'last': 'Aggregate function: returns the last value in a group.', - 'count': 'Aggregate function: returns the number of items in a group.', - 'sum': 'Aggregate function: returns the sum of all values in the expression.', - 'avg': 'Aggregate function: returns the average of the values in a group.', - 'mean': 'Aggregate function: returns the average of the values in a group.', - 'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.', } # math functions that take two arguments as input @@ -128,15 +131,18 @@ _binary_mathfunctions = { } for _name, _doc in _functions.items(): - globals()[_name] = _create_function(_name, _doc) + globals()[_name] = since(1.3)(_create_function(_name, _doc)) +for _name, _doc in _functions_1_4.items(): + globals()[_name] = since(1.4)(_create_function(_name, _doc)) for _name, _doc in _binary_mathfunctions.items(): - globals()[_name] = _create_binary_mathfunction(_name, _doc) + globals()[_name] = since(1.4)(_create_binary_mathfunction(_name, _doc)) del _name, _doc __all__ += _functions.keys() __all__ += _binary_mathfunctions.keys() __all__.sort() +@since(1.4) def array(*cols): """Creates a new array column. @@ -155,6 +161,7 @@ def array(*cols): return Column(jc) +@since(1.3) def approxCountDistinct(col, rsd=None): """Returns a new :class:`Column` for approximate distinct count of ``col``. @@ -169,6 +176,7 @@ def approxCountDistinct(col, rsd=None): return Column(jc) +@since(1.4) def explode(col): """Returns a new row for each element in the given array or map. @@ -189,6 +197,7 @@ def explode(col): return Column(jc) +@since(1.4) def coalesce(*cols): """Returns the first column that is not null. @@ -225,6 +234,7 @@ def coalesce(*cols): return Column(jc) +@since(1.3) def countDistinct(col, *cols): """Returns a new :class:`Column` for distinct count of ``col`` or ``cols``. @@ -239,6 +249,7 @@ def countDistinct(col, *cols): return Column(jc) +@since(1.4) def monotonicallyIncreasingId(): """A column that generates monotonically increasing 64-bit integers. @@ -259,6 +270,7 @@ def monotonicallyIncreasingId(): return Column(sc._jvm.functions.monotonicallyIncreasingId()) +@since(1.4) def rand(seed=None): """Generates a random column with i.i.d. samples from U[0.0, 1.0]. """ @@ -270,6 +282,7 @@ def rand(seed=None): return Column(jc) +@since(1.4) def randn(seed=None): """Generates a column with i.i.d. samples from the standard normal distribution. """ @@ -281,6 +294,7 @@ def randn(seed=None): return Column(jc) +@since(1.4) def sparkPartitionId(): """A column for partition ID of the Spark task. @@ -294,6 +308,7 @@ def sparkPartitionId(): @ignore_unicode_prefix +@since(1.4) def struct(*cols): """Creates a new struct column. @@ -312,6 +327,7 @@ def struct(*cols): return Column(jc) +@since(1.4) def when(condition, value): """Evaluates a list of conditions and returns one of multiple possible result expressions. If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions. @@ -336,6 +352,8 @@ def when(condition, value): class UserDefinedFunction(object): """ User defined function in Python + + .. versionadded:: 1.3 """ def __init__(self, func, returnType): self.func = func @@ -369,6 +387,7 @@ class UserDefinedFunction(object): return Column(jc) +@since(1.3) def udf(f, returnType=StringType()): """Creates a :class:`Column` expression representing a user defined function (UDF). diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index 9f7c743c051d332f2e4c2a2b2567bc73576dbdbe..4da472a577eaeb46fa84e15d5b12ce7a62bd6a4a 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -16,6 +16,7 @@ # from pyspark.rdd import ignore_unicode_prefix +from pyspark.sql import since from pyspark.sql.column import Column, _to_seq from pyspark.sql.dataframe import DataFrame from pyspark.sql.types import * @@ -47,6 +48,8 @@ class GroupedData(object): """ A set of methods for aggregations on a :class:`DataFrame`, created by :func:`DataFrame.groupBy`. + + .. versionadded:: 1.3 """ def __init__(self, jdf, sql_ctx): @@ -54,6 +57,7 @@ class GroupedData(object): self.sql_ctx = sql_ctx @ignore_unicode_prefix + @since(1.3) def agg(self, *exprs): """Compute aggregates and returns the result as a :class:`DataFrame`. @@ -86,6 +90,7 @@ class GroupedData(object): return DataFrame(jdf, self.sql_ctx) @dfapi + @since(1.3) def count(self): """Counts the number of records for each group. @@ -94,6 +99,7 @@ class GroupedData(object): """ @df_varargs_api + @since(1.3) def mean(self, *cols): """Computes average values for each numeric columns for each group. @@ -108,6 +114,7 @@ class GroupedData(object): """ @df_varargs_api + @since(1.3) def avg(self, *cols): """Computes average values for each numeric columns for each group. @@ -122,6 +129,7 @@ class GroupedData(object): """ @df_varargs_api + @since(1.3) def max(self, *cols): """Computes the max value for each numeric columns for each group. @@ -132,6 +140,7 @@ class GroupedData(object): """ @df_varargs_api + @since(1.3) def min(self, *cols): """Computes the min value for each numeric column for each group. @@ -144,6 +153,7 @@ class GroupedData(object): """ @df_varargs_api + @since(1.3) def sum(self, *cols): """Compute the sum for each numeric columns for each group. diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index e2b27fb587e7384b3c9b1fc8c832f432345224be..02b3aab2b12e4f1bf38700c36ba6bc284d345018 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -17,6 +17,7 @@ from py4j.java_gateway import JavaClass +from pyspark.sql import since from pyspark.sql.column import _to_seq from pyspark.sql.types import * @@ -30,6 +31,8 @@ class DataFrameReader(object): to access this. ::Note: Experimental + + .. versionadded:: 1.4 """ def __init__(self, sqlContext): @@ -40,6 +43,7 @@ class DataFrameReader(object): from pyspark.sql.dataframe import DataFrame return DataFrame(jdf, self._sqlContext) + @since(1.4) def load(self, path=None, format=None, schema=None, **options): """Loads data from a data source and returns it as a :class`DataFrame`. @@ -63,6 +67,7 @@ class DataFrameReader(object): else: return self._df(jreader.load()) + @since(1.4) def json(self, path, schema=None): """ Loads a JSON file (one object per line) and returns the result as @@ -107,6 +112,7 @@ class DataFrameReader(object): jdf = self._jreader.schema(jschema).json(path) return self._df(jdf) + @since(1.4) def table(self, tableName): """Returns the specified table as a :class:`DataFrame`. @@ -117,6 +123,7 @@ class DataFrameReader(object): """ return self._df(self._jreader.table(tableName)) + @since(1.4) def parquet(self, *path): """Loads a Parquet file, returning the result as a :class:`DataFrame`. @@ -130,6 +137,7 @@ class DataFrameReader(object): """ return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path))) + @since(1.4) def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties={}): """ @@ -178,12 +186,15 @@ class DataFrameWriter(object): to access this. ::Note: Experimental + + .. versionadded:: 1.4 """ def __init__(self, df): self._df = df self._sqlContext = df.sql_ctx self._jwrite = df._jdf.write() + @since(1.4) def save(self, path=None, format=None, mode="error", **options): """ Saves the contents of the :class:`DataFrame` to a data source. @@ -215,6 +226,7 @@ class DataFrameWriter(object): else: jwrite.save(path) + @since(1.4) def saveAsTable(self, name, format=None, mode="error", **options): """ Saves the contents of this :class:`DataFrame` to a data source as a table. @@ -243,6 +255,7 @@ class DataFrameWriter(object): jwrite = jwrite.option(k, options[k]) return jwrite.saveAsTable(name) + @since(1.4) def json(self, path, mode="error"): """ Saves the content of the :class:`DataFrame` in JSON format at the @@ -261,6 +274,7 @@ class DataFrameWriter(object): """ return self._jwrite.mode(mode).json(path) + @since(1.4) def parquet(self, path, mode="error"): """ Saves the content of the :class:`DataFrame` in Parquet format at the @@ -279,6 +293,7 @@ class DataFrameWriter(object): """ return self._jwrite.mode(mode).parquet(path) + @since(1.4) def jdbc(self, url, table, mode="error", properties={}): """ Saves the content of the :class:`DataFrame` to a external database table