Skip to content
Snippets Groups Projects
Commit d37c7f7f authored by hyukjinkwon's avatar hyukjinkwon Committed by Reynold Xin
Browse files

[SPARK-15050][SQL] Put CSV and JSON options as Python csv and json function parameters

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-15050

This PR adds function parameters for Python API for reading and writing `csv()`.

## How was this patch tested?

This was tested by `./dev/run_tests`.

Author: hyukjinkwon <gurwls223@gmail.com>
Author: Hyukjin Kwon <gurwls223@gmail.com>

Closes #12834 from HyukjinKwon/SPARK-15050.
parent 35d9c8aa
No related branches found
No related tags found
No related merge requests found
......@@ -166,7 +166,10 @@ class DataFrameReader(object):
return self._df(self._jreader.stream())
@since(1.4)
def json(self, path, schema=None):
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None):
"""
Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects
(one object per record) and returns the result as a :class`DataFrame`.
......@@ -177,31 +180,36 @@ class DataFrameReader(object):
:param path: string represents path to the JSON dataset,
or RDD of Strings storing JSON objects.
:param schema: an optional :class:`StructType` for the input schema.
:param primitivesAsString: infers all primitive values as a string type. If None is set,
it uses the default value, ``false``.
:param prefersDecimal: infers all floating-point values as a decimal type. If the values
do not fit in decimal, then it infers them as doubles. If None is
set, it uses the default value, ``false``.
:param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
it uses the default value, ``false``.
:param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
it uses the default value, ``false``.
:param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
set, it uses the default value, ``true``.
:param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
set, it uses the default value, ``false``.
:param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
using backslash quoting mechanism. If None is
set, it uses the default value, ``false``.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
You can set the following JSON-specific options to deal with non-standard JSON files:
* ``primitivesAsString`` (default ``false``): infers all primitive values as a string \
type
* `prefersDecimal` (default `false`): infers all floating-point values as a decimal \
type. If the values do not fit in decimal, then it infers them as doubles.
* ``allowComments`` (default ``false``): ignores Java/C++ style comment in JSON records
* ``allowUnquotedFieldNames`` (default ``false``): allows unquoted JSON field names
* ``allowSingleQuotes`` (default ``true``): allows single quotes in addition to double \
quotes
* ``allowNumericLeadingZeros`` (default ``false``): allows leading zeros in numbers \
(e.g. 00012)
* ``allowBackslashEscapingAnyCharacter`` (default ``false``): allows accepting quoting \
of all character using backslash quoting mechanism
* ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \
during parsing.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
record and puts the malformed string into a new field configured by \
``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
``null`` for extra fields.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
* ``columnNameOfCorruptRecord`` (default ``_corrupt_record``): allows renaming the \
new field having malformed string created by ``PERMISSIVE`` mode. \
This overrides ``spark.sql.columnNameOfCorruptRecord``.
:param columnNameOfCorruptRecord: allows renaming the new field having malformed string
created by ``PERMISSIVE`` mode. This overrides
``spark.sql.columnNameOfCorruptRecord``. If None is set,
it uses the default value ``_corrupt_record``.
>>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
......@@ -214,6 +222,24 @@ class DataFrameReader(object):
"""
if schema is not None:
self.schema(schema)
if primitivesAsString is not None:
self.option("primitivesAsString", primitivesAsString)
if prefersDecimal is not None:
self.option("prefersDecimal", prefersDecimal)
if allowComments is not None:
self.option("allowComments", allowComments)
if allowUnquotedFieldNames is not None:
self.option("allowUnquotedFieldNames", allowUnquotedFieldNames)
if allowSingleQuotes is not None:
self.option("allowSingleQuotes", allowSingleQuotes)
if allowNumericLeadingZero is not None:
self.option("allowNumericLeadingZero", allowNumericLeadingZero)
if allowBackslashEscapingAnyCharacter is not None:
self.option("allowBackslashEscapingAnyCharacter", allowBackslashEscapingAnyCharacter)
if mode is not None:
self.option("mode", mode)
if columnNameOfCorruptRecord is not None:
self.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
elif type(path) == list:
......@@ -270,53 +296,62 @@ class DataFrameReader(object):
[Row(value=u'hello'), Row(value=u'this')]
"""
if isinstance(paths, basestring):
paths = [paths]
return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
path = [paths]
return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
@since(2.0)
def csv(self, paths):
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
comment=None, header=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None,
maxColumns=None, maxCharsPerColumn=None, mode=None):
"""Loads a CSV file and returns the result as a [[DataFrame]].
This function goes through the input once to determine the input schema. To avoid going
through the entire data once, specify the schema explicitly using [[schema]].
:param paths: string, or list of strings, for input path(s).
You can set the following CSV-specific options to deal with CSV files:
* ``sep`` (default ``,``): sets the single character as a separator \
for each field and value.
* ``charset`` (default ``UTF-8``): decodes the CSV files by the given \
encoding type.
* ``quote`` (default ``"``): sets the single character used for escaping \
quoted values where the separator can be part of the value.
* ``escape`` (default ``\``): sets the single character used for escaping quotes \
inside an already quoted value.
* ``comment`` (default empty string): sets the single character used for skipping \
lines beginning with this character. By default, it is disabled.
* ``header`` (default ``false``): uses the first line as names of columns.
* ``ignoreLeadingWhiteSpace`` (default ``false``): defines whether or not leading \
whitespaces from values being read should be skipped.
* ``ignoreTrailingWhiteSpace`` (default ``false``): defines whether or not trailing \
whitespaces from values being read should be skipped.
* ``nullValue`` (default empty string): sets the string representation of a null value.
* ``nanValue`` (default ``NaN``): sets the string representation of a non-number \
value.
* ``positiveInf`` (default ``Inf``): sets the string representation of a positive \
infinity value.
* ``negativeInf`` (default ``-Inf``): sets the string representation of a negative \
infinity value.
* ``dateFormat`` (default ``None``): sets the string that indicates a date format. \
Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This \
applies to both date type and timestamp type. By default, it is None which means \
trying to parse times and date by ``java.sql.Timestamp.valueOf()`` and \
``java.sql.Date.valueOf()``.
* ``maxColumns`` (default ``20480``): defines a hard limit of how many columns \
a record can have.
* ``maxCharsPerColumn`` (default ``1000000``): defines the maximum number of \
characters allowed for any given value being read.
* ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \
during parsing.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. \
:param path: string, or list of strings, for input path(s).
:param schema: an optional :class:`StructType` for the input schema.
:param sep: sets the single character as a separator for each field and value.
If None is set, it uses the default value, ``,``.
:param encoding: decodes the CSV files by the given encoding type. If None is set,
it uses the default value, ``UTF-8``.
:param quote: sets the single character used for escaping quoted values where the
separator can be part of the value. If None is set, it uses the default
value, ``"``.
:param escape: sets the single character used for escaping quotes inside an already
quoted value. If None is set, it uses the default value, ``\``.
:param comment: sets the single character used for skipping lines beginning with this
character. By default (None), it is disabled.
:param header: uses the first line as names of columns. If None is set, it uses the
default value, ``false``.
:param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values
being read should be skipped. If None is set, it uses
the default value, ``false``.
:param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values
being read should be skipped. If None is set, it uses
the default value, ``false``.
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string.
:param nanValue: sets the string representation of a non-number value. If None is set, it
uses the default value, ``NaN``.
:param positiveInf: sets the string representation of a positive infinity value. If None
is set, it uses the default value, ``Inf``.
:param negativeInf: sets the string representation of a negative infinity value. If None
is set, it uses the default value, ``Inf``.
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to both date type and timestamp type. By default, it is None
which means trying to parse times and date by
``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
value being read. If None is set, it uses the default value,
``1000000``.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
When a schema is set by user, it sets ``null`` for extra fields.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
......@@ -325,9 +360,43 @@ class DataFrameReader(object):
>>> df.dtypes
[('C0', 'string'), ('C1', 'string')]
"""
if isinstance(paths, basestring):
paths = [paths]
return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
if schema is not None:
self.schema(schema)
if sep is not None:
self.option("sep", sep)
if encoding is not None:
self.option("encoding", encoding)
if quote is not None:
self.option("quote", quote)
if escape is not None:
self.option("escape", escape)
if comment is not None:
self.option("comment", comment)
if header is not None:
self.option("header", header)
if ignoreLeadingWhiteSpace is not None:
self.option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace)
if ignoreTrailingWhiteSpace is not None:
self.option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace)
if nullValue is not None:
self.option("nullValue", nullValue)
if nanValue is not None:
self.option("nanValue", nanValue)
if positiveInf is not None:
self.option("positiveInf", positiveInf)
if negativeInf is not None:
self.option("negativeInf", negativeInf)
if dateFormat is not None:
self.option("dateFormat", dateFormat)
if maxColumns is not None:
self.option("maxColumns", maxColumns)
if maxCharsPerColumn is not None:
self.option("maxCharsPerColumn", maxCharsPerColumn)
if mode is not None:
self.option("mode", mode)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
@since(1.5)
def orc(self, path):
......@@ -687,7 +756,8 @@ class DataFrameWriter(object):
self._jwrite.text(path)
@since(2.0)
def csv(self, path, mode=None, compression=None):
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None):
"""Saves the content of the [[DataFrame]] in CSV format at the specified path.
:param path: the path in any Hadoop supported file system
......@@ -701,25 +771,33 @@ class DataFrameWriter(object):
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
You can set the following CSV-specific options to deal with CSV files:
* ``sep`` (default ``,``): sets the single character as a separator \
for each field and value.
* ``quote`` (default ``"``): sets the single character used for escaping \
quoted values where the separator can be part of the value.
* ``escape`` (default ``\``): sets the single character used for escaping quotes \
inside an already quoted value.
* ``header`` (default ``false``): writes the names of columns as the first line.
* ``nullValue`` (default empty string): sets the string representation of a null value.
* ``compression``: compression codec to use when saving to file. This can be one of \
the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and \
deflate).
:param sep: sets the single character as a separator for each field and value. If None is
set, it uses the default value, ``,``.
:param quote: sets the single character used for escaping quoted values where the
separator can be part of the value. If None is set, it uses the default
value, ``"``.
:param escape: sets the single character used for escaping quotes inside an already
quoted value. If None is set, it uses the default value, ``\``
:param header: writes the names of columns as the first line. If None is set, it uses
the default value, ``false``.
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string.
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
if compression is not None:
self.option("compression", compression)
if sep is not None:
self.option("sep", sep)
if quote is not None:
self.option("quote", quote)
if escape is not None:
self.option("escape", escape)
if header is not None:
self.option("header", header)
if nullValue is not None:
self.option("nullValue", nullValue)
self._jwrite.csv(path)
@since(1.5)
......@@ -797,7 +875,7 @@ def _test():
globs['sqlContext'] = SQLContext(sc)
globs['hiveContext'] = HiveContext._createForTesting(sc)
globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned')
globs['sdf'] =\
globs['sdf'] = \
globs['sqlContext'].read.format('text').stream('python/test_support/sql/streaming')
(failure_count, test_count) = doctest.testmod(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment