Skip to content
Snippets Groups Projects
Commit b7e8d1cb authored by Takeshi YAMAMURO's avatar Takeshi YAMAMURO Committed by Reynold Xin
Browse files

[SPARK-15585][SQL] Fix NULL handling along with a spark-csv behaivour

## What changes were proposed in this pull request?
This pr fixes the behaviour of `format("csv").option("quote", null)` along with one of spark-csv.
Also, it explicitly sets default values for CSV options in python.

## How was this patch tested?
Added tests in CSVSuite.

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #13372 from maropu/SPARK-15585.
parent 79268aa4
No related branches found
No related tags found
No related merge requests found
...@@ -303,10 +303,11 @@ class DataFrameReader(object): ...@@ -303,10 +303,11 @@ class DataFrameReader(object):
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(path))) return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(path)))
@since(2.0) @since(2.0)
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, def csv(self, path, schema=None, sep=u',', encoding=u'UTF-8', quote=u'\"', escape=u'\\',
comment=None, header=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, comment=None, header='false', ignoreLeadingWhiteSpace='false',
nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, ignoreTrailingWhiteSpace='false', nullValue='', nanValue='NaN', positiveInf='Inf',
maxColumns=None, maxCharsPerColumn=None, mode=None): negativeInf='Inf', dateFormat=None, maxColumns='20480', maxCharsPerColumn='1000000',
mode='PERMISSIVE'):
"""Loads a CSV file and returns the result as a [[DataFrame]]. """Loads a CSV file and returns the result as a [[DataFrame]].
This function goes through the input once to determine the input schema. To avoid going This function goes through the input once to determine the input schema. To avoid going
...@@ -315,44 +316,41 @@ class DataFrameReader(object): ...@@ -315,44 +316,41 @@ class DataFrameReader(object):
:param path: string, or list of strings, for input path(s). :param path: string, or list of strings, for input path(s).
:param schema: an optional :class:`StructType` for the input schema. :param schema: an optional :class:`StructType` for the input schema.
:param sep: sets the single character as a separator for each field and value. :param sep: sets the single character as a separator for each field and value.
If None is set, it uses the default value, ``,``. The default value is ``,``.
:param encoding: decodes the CSV files by the given encoding type. If None is set, :param encoding: decodes the CSV files by the given encoding type.
it uses the default value, ``UTF-8``. The default value is ``UTF-8``.
:param quote: sets the single character used for escaping quoted values where the :param quote: sets the single character used for escaping quoted values where the
separator can be part of the value. If None is set, it uses the default separator can be part of the value. The default value is ``"``.
value, ``"``.
:param escape: sets the single character used for escaping quotes inside an already :param escape: sets the single character used for escaping quotes inside an already
quoted value. If None is set, it uses the default value, ``\``. quoted value. The default value is ``\``.
:param comment: sets the single character used for skipping lines beginning with this :param comment: sets the single character used for skipping lines beginning with this
character. By default (None), it is disabled. character. By default (None), it is disabled.
:param header: uses the first line as names of columns. If None is set, it uses the :param header: uses the first line as names of columns. The default value is ``false``.
default value, ``false``.
:param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values
being read should be skipped. If None is set, it uses being read should be skipped. The default value is
the default value, ``false``. ``false``.
:param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values
being read should be skipped. If None is set, it uses being read should be skipped. The default value is
the default value, ``false``. ``false``.
:param nullValue: sets the string representation of a null value. If None is set, it uses :param nullValue: sets the string representation of a null value. The default value is a
the default value, empty string. empty string.
:param nanValue: sets the string representation of a non-number value. If None is set, it :param nanValue: sets the string representation of a non-number value. The default value is
uses the default value, ``NaN``. ``NaN``.
:param positiveInf: sets the string representation of a positive infinity value. If None :param positiveInf: sets the string representation of a positive infinity value. The default
is set, it uses the default value, ``Inf``. value is ``Inf``.
:param negativeInf: sets the string representation of a negative infinity value. If None :param negativeInf: sets the string representation of a negative infinity value. The default
is set, it uses the default value, ``Inf``. value is ``Inf``.
:param dateFormat: sets the string that indicates a date format. Custom date formats :param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This follow the formats at ``java.text.SimpleDateFormat``. This
applies to both date type and timestamp type. By default, it is None applies to both date type and timestamp type. By default, it is None
which means trying to parse times and date by which means trying to parse times and date by
``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``. ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is :param maxColumns: defines a hard limit of how many columns a record can have. The default
set, it uses the default value, ``20480``. value is ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
value being read. If None is set, it uses the default value, value being read. The default value is ``1000000``.
``1000000``. :param mode: allows a mode for dealing with corrupt records during parsing. The default
:param mode: allows a mode for dealing with corrupt records during parsing. If None is value is ``PERMISSIVE``.
set, it uses the default value, ``PERMISSIVE``.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
When a schema is set by user, it sets ``null`` for extra fields. When a schema is set by user, it sets ``null`` for extra fields.
...@@ -785,8 +783,8 @@ class DataFrameWriter(object): ...@@ -785,8 +783,8 @@ class DataFrameWriter(object):
self._jwrite.text(path) self._jwrite.text(path)
@since(2.0) @since(2.0)
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, def csv(self, path, mode='error', compression=None, sep=',', quote=u'\"', escape='\\',
header=None, nullValue=None, escapeQuotes=None): header='false', nullValue='', escapeQuotes='true'):
"""Saves the content of the [[DataFrame]] in CSV format at the specified path. """Saves the content of the [[DataFrame]] in CSV format at the specified path.
:param path: the path in any Hadoop supported file system :param path: the path in any Hadoop supported file system
...@@ -800,20 +798,19 @@ class DataFrameWriter(object): ...@@ -800,20 +798,19 @@ class DataFrameWriter(object):
:param compression: compression codec to use when saving to file. This can be one of the :param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4, known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate). snappy and deflate).
:param sep: sets the single character as a separator for each field and value. If None is :param sep: sets the single character as a separator for each field and value. The default
set, it uses the default value, ``,``. value is ``,``.
:param quote: sets the single character used for escaping quoted values where the :param quote: sets the single character used for escaping quoted values where the
separator can be part of the value. If None is set, it uses the default separator can be part of the value. The default value is ``"``.
value, ``"``.
:param escape: sets the single character used for escaping quotes inside an already :param escape: sets the single character used for escaping quotes inside an already
quoted value. If None is set, it uses the default value, ``\`` quoted value. The default value is ``\``
:param escapeQuotes: A flag indicating whether values containing quotes should always :param escapeQuotes: A flag indicating whether values containing quotes should always
be enclosed in quotes. If None is set, it uses the default value be enclosed in quotes. If None is set, it uses the default value
``true``, escaping all values containing a quote character. ``true``, escaping all values containing a quote character.
:param header: writes the names of columns as the first line. If None is set, it uses :param header: writes the names of columns as the first line. The default value is
the default value, ``false``. ``false``.
:param nullValue: sets the string representation of a null value. If None is set, it uses :param nullValue: sets the string representation of a null value. The default value is a
the default value, empty string. empty string.
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
""" """
...@@ -831,7 +828,7 @@ class DataFrameWriter(object): ...@@ -831,7 +828,7 @@ class DataFrameWriter(object):
if nullValue is not None: if nullValue is not None:
self.option("nullValue", nullValue) self.option("nullValue", nullValue)
if escapeQuotes is not None: if escapeQuotes is not None:
self.option("escapeQuotes", nullValue) self.option("escapeQuotes", escapeQuotes)
self._jwrite.csv(path) self._jwrite.csv(path)
@since(1.5) @since(1.5)
......
...@@ -30,8 +30,7 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str ...@@ -30,8 +30,7 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str
val paramValue = parameters.get(paramName) val paramValue = parameters.get(paramName)
paramValue match { paramValue match {
case None => default case None => default
case Some(null) => default case Some(value) if value == null || value.length == 0 => '\u0000'
case Some(value) if value.length == 0 => '\u0000'
case Some(value) if value.length == 1 => value.charAt(0) case Some(value) if value.length == 1 => value.charAt(0)
case _ => throw new RuntimeException(s"$paramName cannot be more than one character") case _ => throw new RuntimeException(s"$paramName cannot be more than one character")
} }
...@@ -52,12 +51,12 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str ...@@ -52,12 +51,12 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str
} }
private def getBool(paramName: String, default: Boolean = false): Boolean = { private def getBool(paramName: String, default: Boolean = false): Boolean = {
val param = parameters.getOrElse(paramName, default.toString) val paramValue = parameters.getOrElse(paramName, default.toString)
if (param == null) { if (paramValue == null) {
default default
} else if (param.toLowerCase == "true") { } else if (paramValue.toLowerCase == "true") {
true true
} else if (param.toLowerCase == "false") { } else if (paramValue.toLowerCase == "false") {
false false
} else { } else {
throw new Exception(s"$paramName flag can be true or false") throw new Exception(s"$paramName flag can be true or false")
......
...@@ -655,4 +655,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { ...@@ -655,4 +655,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(msg.contains("CSV data source does not support array<string> data type")) assert(msg.contains("CSV data source does not support array<string> data type"))
} }
} }
test("SPARK-15585 set null at quote") {
val cars = spark.read
.format("csv")
.option("header", "true")
.option("quote", null)
.load(testFile(carsUnbalancedQuotesFile))
verifyCars(cars, withHeader = true, checkValues = false)
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment