Skip to content
Snippets Groups Projects
Commit 14f0358b authored by Reynold Xin's avatar Reynold Xin
Browse files

Python docstring update for sql.py.

Mostly related to the following two rules in PEP8 and PEP257:
- Line length < 72 chars.
- First line should be a concise description of the function/class.

Author: Reynold Xin <rxin@apache.org>

Closes #869 from rxin/docstring-schemardd and squashes the following commits:

7cf0cbc [Reynold Xin] Updated sql.py for pep8 docstring.
0a4aef9 [Reynold Xin] Merge branch 'master' into docstring-schemardd
6678937 [Reynold Xin] Python docstring update for sql.py.
parent d79c2b28
No related branches found
No related tags found
No related merge requests found
......@@ -23,14 +23,14 @@ __all__ = ["SQLContext", "HiveContext", "LocalHiveContext", "TestHiveContext", "
class SQLContext:
"""
Main entry point for SparkSQL functionality. A SQLContext can be used create L{SchemaRDD}s,
register L{SchemaRDD}s as tables, execute sql over tables, cache tables, and read parquet files.
"""Main entry point for SparkSQL functionality.
A SQLContext can be used create L{SchemaRDD}s, register L{SchemaRDD}s as
tables, execute SQL over tables, cache tables, and read parquet files.
"""
def __init__(self, sparkContext, sqlContext = None):
"""
Create a new SQLContext.
"""Create a new SQLContext.
@param sparkContext: The SparkContext to wrap.
......@@ -63,18 +63,20 @@ class SQLContext:
@property
def _ssql_ctx(self):
"""
Accessor for the JVM SparkSQL context. Subclasses can override this property to provide
their own JVM Contexts.
"""Accessor for the JVM SparkSQL context.
Subclasses can override this property to provide their own
JVM Contexts.
"""
if not hasattr(self, '_scala_SQLContext'):
self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc())
return self._scala_SQLContext
def inferSchema(self, rdd):
"""
Infer and apply a schema to an RDD of L{dict}s. We peek at the first row of the RDD to
determine the fields names and types, and then use that to extract all the dictionaries.
"""Infer and apply a schema to an RDD of L{dict}s.
We peek at the first row of the RDD to determine the fields names
and types, and then use that to extract all the dictionaries.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"},
......@@ -92,9 +94,10 @@ class SQLContext:
return SchemaRDD(srdd, self)
def registerRDDAsTable(self, rdd, tableName):
"""
Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
during the lifetime of this instance of SQLContext.
"""Registers the given RDD as a temporary table in the catalog.
Temporary tables exist only during the lifetime of this instance of
SQLContext.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
......@@ -106,8 +109,7 @@ class SQLContext:
raise ValueError("Can only register SchemaRDD as table")
def parquetFile(self, path):
"""
Loads a Parquet file, returning the result as a L{SchemaRDD}.
"""Loads a Parquet file, returning the result as a L{SchemaRDD}.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
......@@ -122,8 +124,7 @@ class SQLContext:
return SchemaRDD(jschema_rdd, self)
def sql(self, sqlQuery):
"""
Executes a SQL query using Spark, returning the result as a L{SchemaRDD}.
"""Return a L{SchemaRDD} representing the result of the given query.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
......@@ -135,8 +136,7 @@ class SQLContext:
return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self)
def table(self, tableName):
"""
Returns the specified table as a L{SchemaRDD}.
"""Returns the specified table as a L{SchemaRDD}.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
......@@ -147,23 +147,19 @@ class SQLContext:
return SchemaRDD(self._ssql_ctx.table(tableName), self)
def cacheTable(self, tableName):
"""
Caches the specified table in-memory.
"""
"""Caches the specified table in-memory."""
self._ssql_ctx.cacheTable(tableName)
def uncacheTable(self, tableName):
"""
Removes the specified table from the in-memory cache.
"""
"""Removes the specified table from the in-memory cache."""
self._ssql_ctx.uncacheTable(tableName)
class HiveContext(SQLContext):
"""
An instance of the Spark SQL execution engine that integrates with data stored in Hive.
Configuration for Hive is read from hive-site.xml on the classpath. It supports running both SQL
and HiveQL commands.
"""A variant of Spark SQL that integrates with data stored in Hive.
Configuration for Hive is read from hive-site.xml on the classpath.
It supports running both SQL and HiveQL commands.
"""
@property
......@@ -193,9 +189,10 @@ class HiveContext(SQLContext):
class LocalHiveContext(HiveContext):
"""
Starts up an instance of hive where metadata is stored locally. An in-process metadata data is
created with data stored in ./metadata. Warehouse data is stored in in ./warehouse.
"""Starts up an instance of hive where metadata is stored locally.
An in-process metadata data is created with data stored in ./metadata.
Warehouse data is stored in in ./warehouse.
>>> import os
>>> hiveCtx = LocalHiveContext(sc)
......@@ -228,8 +225,10 @@ class TestHiveContext(HiveContext):
# TODO: Investigate if it is more efficient to use a namedtuple. One problem is that named tuples
# are custom classes that must be generated per Schema.
class Row(dict):
"""
An extended L{dict} that takes a L{dict} in its constructor, and exposes those items as fields.
"""A row in L{SchemaRDD}.
An extended L{dict} that takes a L{dict} in its constructor, and
exposes those items as fields.
>>> r = Row({"hello" : "world", "foo" : "bar"})
>>> r.hello
......@@ -245,13 +244,16 @@ class Row(dict):
class SchemaRDD(RDD):
"""
An RDD of L{Row} objects that has an associated schema. The underlying JVM object is a SchemaRDD,
not a PythonRDD, so we can utilize the relational query api exposed by SparkSQL.
"""An RDD of L{Row} objects that has an associated schema.
For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the L{SchemaRDD} is not operated on
directly, as it's underlying implementation is a RDD composed of Java objects. Instead it is
converted to a PythonRDD in the JVM, on which Python operations can be done.
The underlying JVM object is a SchemaRDD, not a PythonRDD, so we can
utilize the relational query api exposed by SparkSQL.
For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the
L{SchemaRDD} is not operated on directly, as it's underlying
implementation is a RDD composed of Java objects. Instead it is
converted to a PythonRDD in the JVM, on which Python operations can
be done.
"""
def __init__(self, jschema_rdd, sql_ctx):
......@@ -266,8 +268,9 @@ class SchemaRDD(RDD):
@property
def _jrdd(self):
"""
Lazy evaluation of PythonRDD object. Only done when a user calls methods defined by the
"""Lazy evaluation of PythonRDD object.
Only done when a user calls methods defined by the
L{pyspark.rdd.RDD} super class (map, filter, etc.).
"""
if not hasattr(self, '_lazy_jrdd'):
......@@ -279,10 +282,10 @@ class SchemaRDD(RDD):
return self._jrdd.id()
def saveAsParquetFile(self, path):
"""
Saves the contents of this L{SchemaRDD} as a parquet file, preserving the schema. Files
that are written out using this method can be read back in as a SchemaRDD using the
L{SQLContext.parquetFile} method.
"""Save the contents as a Parquet file, preserving the schema.
Files that are written out using this method can be read back in as
a SchemaRDD using the L{SQLContext.parquetFile} method.
>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
......@@ -296,9 +299,10 @@ class SchemaRDD(RDD):
self._jschema_rdd.saveAsParquetFile(path)
def registerAsTable(self, name):
"""
Registers this RDD as a temporary table using the given name. The lifetime of this temporary
table is tied to the L{SQLContext} that was used to create this SchemaRDD.
"""Registers this RDD as a temporary table using the given name.
The lifetime of this temporary table is tied to the L{SQLContext}
that was used to create this SchemaRDD.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.registerAsTable("test")
......@@ -309,24 +313,22 @@ class SchemaRDD(RDD):
self._jschema_rdd.registerAsTable(name)
def insertInto(self, tableName, overwrite = False):
"""
Inserts the contents of this SchemaRDD into the specified table,
optionally overwriting any existing data.
"""Inserts the contents of this SchemaRDD into the specified table.
Optionally overwriting any existing data.
"""
self._jschema_rdd.insertInto(tableName, overwrite)
def saveAsTable(self, tableName):
"""
Creates a new table with the contents of this SchemaRDD.
"""
"""Creates a new table with the contents of this SchemaRDD."""
self._jschema_rdd.saveAsTable(tableName)
def count(self):
"""
Return the number of elements in this RDD. Unlike the base RDD
implementation of count, this implementation leverages the query
optimizer to compute the count on the SchemaRDD, which supports
features such as filter pushdown.
"""Return the number of elements in this RDD.
Unlike the base RDD implementation of count, this implementation
leverages the query optimizer to compute the count on the SchemaRDD,
which supports features such as filter pushdown.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.count()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment