Skip to content
Snippets Groups Projects
Commit 8891495a authored by Reynold Xin's avatar Reynold Xin
Browse files

Python docstring update for sql.py.


Mostly related to the following two rules in PEP8 and PEP257:
- Line length < 72 chars.
- First line should be a concise description of the function/class.

Author: Reynold Xin <rxin@apache.org>

Closes #869 from rxin/docstring-schemardd and squashes the following commits:

7cf0cbc [Reynold Xin] Updated sql.py for pep8 docstring.
0a4aef9 [Reynold Xin] Merge branch 'master' into docstring-schemardd
6678937 [Reynold Xin] Python docstring update for sql.py.

(cherry picked from commit 14f0358b)
Signed-off-by: default avatarReynold Xin <rxin@apache.org>
parent 33683974
No related branches found
No related tags found
No related merge requests found
...@@ -23,14 +23,14 @@ __all__ = ["SQLContext", "HiveContext", "LocalHiveContext", "TestHiveContext", " ...@@ -23,14 +23,14 @@ __all__ = ["SQLContext", "HiveContext", "LocalHiveContext", "TestHiveContext", "
class SQLContext: class SQLContext:
""" """Main entry point for SparkSQL functionality.
Main entry point for SparkSQL functionality. A SQLContext can be used create L{SchemaRDD}s,
register L{SchemaRDD}s as tables, execute sql over tables, cache tables, and read parquet files. A SQLContext can be used create L{SchemaRDD}s, register L{SchemaRDD}s as
tables, execute SQL over tables, cache tables, and read parquet files.
""" """
def __init__(self, sparkContext, sqlContext = None): def __init__(self, sparkContext, sqlContext = None):
""" """Create a new SQLContext.
Create a new SQLContext.
@param sparkContext: The SparkContext to wrap. @param sparkContext: The SparkContext to wrap.
...@@ -63,18 +63,20 @@ class SQLContext: ...@@ -63,18 +63,20 @@ class SQLContext:
@property @property
def _ssql_ctx(self): def _ssql_ctx(self):
""" """Accessor for the JVM SparkSQL context.
Accessor for the JVM SparkSQL context. Subclasses can override this property to provide
their own JVM Contexts. Subclasses can override this property to provide their own
JVM Contexts.
""" """
if not hasattr(self, '_scala_SQLContext'): if not hasattr(self, '_scala_SQLContext'):
self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc()) self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc())
return self._scala_SQLContext return self._scala_SQLContext
def inferSchema(self, rdd): def inferSchema(self, rdd):
""" """Infer and apply a schema to an RDD of L{dict}s.
Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to
determine the fields names and types, and then use that to extract all the dictionaries. We peek at the first row of the RDD to determine the fields names
and types, and then use that to extract all the dictionaries.
>>> srdd = sqlCtx.inferSchema(rdd) >>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, >>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"},
...@@ -92,9 +94,10 @@ class SQLContext: ...@@ -92,9 +94,10 @@ class SQLContext:
return SchemaRDD(srdd, self) return SchemaRDD(srdd, self)
def registerRDDAsTable(self, rdd, tableName): def registerRDDAsTable(self, rdd, tableName):
""" """Registers the given RDD as a temporary table in the catalog.
Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
during the lifetime of this instance of SQLContext. Temporary tables exist only during the lifetime of this instance of
SQLContext.
>>> srdd = sqlCtx.inferSchema(rdd) >>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1") >>> sqlCtx.registerRDDAsTable(srdd, "table1")
...@@ -106,8 +109,7 @@ class SQLContext: ...@@ -106,8 +109,7 @@ class SQLContext:
raise ValueError("Can only register SchemaRDD as table") raise ValueError("Can only register SchemaRDD as table")
def parquetFile(self, path): def parquetFile(self, path):
""" """Loads a Parquet file, returning the result as a L{SchemaRDD}.
Loads a Parquet file, returning the result as a L{SchemaRDD}.
>>> import tempfile, shutil >>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp() >>> parquetFile = tempfile.mkdtemp()
...@@ -122,8 +124,7 @@ class SQLContext: ...@@ -122,8 +124,7 @@ class SQLContext:
return SchemaRDD(jschema_rdd, self) return SchemaRDD(jschema_rdd, self)
def sql(self, sqlQuery): def sql(self, sqlQuery):
""" """Return a L{SchemaRDD} representing the result of the given query.
Executes a SQL query using Spark, returning the result as a L{SchemaRDD}.
>>> srdd = sqlCtx.inferSchema(rdd) >>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1") >>> sqlCtx.registerRDDAsTable(srdd, "table1")
...@@ -135,8 +136,7 @@ class SQLContext: ...@@ -135,8 +136,7 @@ class SQLContext:
return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self)
def table(self, tableName): def table(self, tableName):
""" """Returns the specified table as a L{SchemaRDD}.
Returns the specified table as a L{SchemaRDD}.
>>> srdd = sqlCtx.inferSchema(rdd) >>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1") >>> sqlCtx.registerRDDAsTable(srdd, "table1")
...@@ -147,23 +147,19 @@ class SQLContext: ...@@ -147,23 +147,19 @@ class SQLContext:
return SchemaRDD(self._ssql_ctx.table(tableName), self) return SchemaRDD(self._ssql_ctx.table(tableName), self)
def cacheTable(self, tableName): def cacheTable(self, tableName):
""" """Caches the specified table in-memory."""
Caches the specified table in-memory.
"""
self._ssql_ctx.cacheTable(tableName) self._ssql_ctx.cacheTable(tableName)
def uncacheTable(self, tableName): def uncacheTable(self, tableName):
""" """Removes the specified table from the in-memory cache."""
Removes the specified table from the in-memory cache.
"""
self._ssql_ctx.uncacheTable(tableName) self._ssql_ctx.uncacheTable(tableName)
class HiveContext(SQLContext): class HiveContext(SQLContext):
""" """A variant of Spark SQL that integrates with data stored in Hive.
An instance of the Spark SQL execution engine that integrates with data stored in Hive.
Configuration for Hive is read from hive-site.xml on the classpath. It supports running both SQL Configuration for Hive is read from hive-site.xml on the classpath.
and HiveQL commands. It supports running both SQL and HiveQL commands.
""" """
@property @property
...@@ -193,9 +189,10 @@ class HiveContext(SQLContext): ...@@ -193,9 +189,10 @@ class HiveContext(SQLContext):
class LocalHiveContext(HiveContext): class LocalHiveContext(HiveContext):
""" """Starts up an instance of hive where metadata is stored locally.
Starts up an instance of hive where metadata is stored locally. An in-process metadata data is
created with data stored in ./metadata. Warehouse data is stored in in ./warehouse. An in-process metadata data is created with data stored in ./metadata.
Warehouse data is stored in in ./warehouse.
>>> import os >>> import os
>>> hiveCtx = LocalHiveContext(sc) >>> hiveCtx = LocalHiveContext(sc)
...@@ -228,8 +225,10 @@ class TestHiveContext(HiveContext): ...@@ -228,8 +225,10 @@ class TestHiveContext(HiveContext):
# TODO: Investigate if it is more efficient to use a namedtuple. One problem is that named tuples # TODO: Investigate if it is more efficient to use a namedtuple. One problem is that named tuples
# are custom classes that must be generated per Schema. # are custom classes that must be generated per Schema.
class Row(dict): class Row(dict):
""" """A row in L{SchemaRDD}.
An extended L{dict} that takes a L{dict} in its constructor, and exposes those items as fields.
An extended L{dict} that takes a L{dict} in its constructor, and
exposes those items as fields.
>>> r = Row({"hello" : "world", "foo" : "bar"}) >>> r = Row({"hello" : "world", "foo" : "bar"})
>>> r.hello >>> r.hello
...@@ -245,13 +244,16 @@ class Row(dict): ...@@ -245,13 +244,16 @@ class Row(dict):
class SchemaRDD(RDD): class SchemaRDD(RDD):
""" """An RDD of L{Row} objects that has an associated schema.
An RDD of L{Row} objects that has an associated schema. The underlying JVM object is a SchemaRDD,
not a PythonRDD, so we can utilize the relational query api exposed by SparkSQL.
For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the L{SchemaRDD} is not operated on The underlying JVM object is a SchemaRDD, not a PythonRDD, so we can
directly, as it's underlying implementation is a RDD composed of Java objects. Instead it is utilize the relational query api exposed by SparkSQL.
converted to a PythonRDD in the JVM, on which Python operations can be done.
For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the
L{SchemaRDD} is not operated on directly, as it's underlying
implementation is a RDD composed of Java objects. Instead it is
converted to a PythonRDD in the JVM, on which Python operations can
be done.
""" """
def __init__(self, jschema_rdd, sql_ctx): def __init__(self, jschema_rdd, sql_ctx):
...@@ -266,8 +268,9 @@ class SchemaRDD(RDD): ...@@ -266,8 +268,9 @@ class SchemaRDD(RDD):
@property @property
def _jrdd(self): def _jrdd(self):
""" """Lazy evaluation of PythonRDD object.
Lazy evaluation of PythonRDD object. Only done when a user calls methods defined by the
Only done when a user calls methods defined by the
L{pyspark.rdd.RDD} super class (map, filter, etc.). L{pyspark.rdd.RDD} super class (map, filter, etc.).
""" """
if not hasattr(self, '_lazy_jrdd'): if not hasattr(self, '_lazy_jrdd'):
...@@ -279,10 +282,10 @@ class SchemaRDD(RDD): ...@@ -279,10 +282,10 @@ class SchemaRDD(RDD):
return self._jrdd.id() return self._jrdd.id()
def saveAsParquetFile(self, path): def saveAsParquetFile(self, path):
""" """Save the contents as a Parquet file, preserving the schema.
Saves the contents of this L{SchemaRDD} as a parquet file, preserving the schema. Files
that are written out using this method can be read back in as a SchemaRDD using the Files that are written out using this method can be read back in as
L{SQLContext.parquetFile} method. a SchemaRDD using the L{SQLContext.parquetFile} method.
>>> import tempfile, shutil >>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp() >>> parquetFile = tempfile.mkdtemp()
...@@ -296,9 +299,10 @@ class SchemaRDD(RDD): ...@@ -296,9 +299,10 @@ class SchemaRDD(RDD):
self._jschema_rdd.saveAsParquetFile(path) self._jschema_rdd.saveAsParquetFile(path)
def registerAsTable(self, name): def registerAsTable(self, name):
""" """Registers this RDD as a temporary table using the given name.
Registers this RDD as a temporary table using the given name. The lifetime of this temporary
table is tied to the L{SQLContext} that was used to create this SchemaRDD. The lifetime of this temporary table is tied to the L{SQLContext}
that was used to create this SchemaRDD.
>>> srdd = sqlCtx.inferSchema(rdd) >>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.registerAsTable("test") >>> srdd.registerAsTable("test")
...@@ -309,24 +313,22 @@ class SchemaRDD(RDD): ...@@ -309,24 +313,22 @@ class SchemaRDD(RDD):
self._jschema_rdd.registerAsTable(name) self._jschema_rdd.registerAsTable(name)
def insertInto(self, tableName, overwrite = False): def insertInto(self, tableName, overwrite = False):
""" """Inserts the contents of this SchemaRDD into the specified table.
Inserts the contents of this SchemaRDD into the specified table,
optionally overwriting any existing data. Optionally overwriting any existing data.
""" """
self._jschema_rdd.insertInto(tableName, overwrite) self._jschema_rdd.insertInto(tableName, overwrite)
def saveAsTable(self, tableName): def saveAsTable(self, tableName):
""" """Creates a new table with the contents of this SchemaRDD."""
Creates a new table with the contents of this SchemaRDD.
"""
self._jschema_rdd.saveAsTable(tableName) self._jschema_rdd.saveAsTable(tableName)
def count(self): def count(self):
""" """Return the number of elements in this RDD.
Return the number of elements in this RDD. Unlike the base RDD
implementation of count, this implementation leverages the query Unlike the base RDD implementation of count, this implementation
optimizer to compute the count on the SchemaRDD, which supports leverages the query optimizer to compute the count on the SchemaRDD,
features such as filter pushdown. which supports features such as filter pushdown.
>>> srdd = sqlCtx.inferSchema(rdd) >>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.count() >>> srdd.count()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment