diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 759c27507c3972a615e6617c849910f46f0753f3..5e732b4bec8fd08ed395f71672b20fc1d2aa80d3 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -341,12 +341,12 @@ class DataFrameReader(OptionUtils): default value, ``false``. :param inferSchema: infers the input schema automatically from data. It requires one extra pass over the data. If None is set, it uses the default value, ``false``. - :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values - being read should be skipped. If None is set, it uses - the default value, ``false``. - :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values - being read should be skipped. If None is set, it uses - the default value, ``false``. + :param ignoreLeadingWhiteSpace: A flag indicating whether or not leading whitespaces from + values being read should be skipped. If None is set, it + uses the default value, ``false``. + :param ignoreTrailingWhiteSpace: A flag indicating whether or not trailing whitespaces from + values being read should be skipped. If None is set, it + uses the default value, ``false``. :param nullValue: sets the string representation of a null value. If None is set, it uses the default value, empty string. Since 2.0.1, this ``nullValue`` param applies to all supported types including the string type. @@ -706,7 +706,7 @@ class DataFrameWriter(OptionUtils): @since(2.0) def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, - timestampFormat=None): + timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -728,10 +728,10 @@ class DataFrameWriter(OptionUtils): empty string. :param escape: sets the single character used for escaping quotes inside an already quoted value. If None is set, it uses the default value, ``\`` - :param escapeQuotes: A flag indicating whether values containing quotes should always + :param escapeQuotes: a flag indicating whether values containing quotes should always be enclosed in quotes. If None is set, it uses the default value ``true``, escaping all values containing a quote character. - :param quoteAll: A flag indicating whether all values should always be enclosed in + :param quoteAll: a flag indicating whether all values should always be enclosed in quotes. If None is set, it uses the default value ``false``, only escaping values containing a quote character. :param header: writes the names of columns as the first line. If None is set, it uses @@ -746,13 +746,21 @@ class DataFrameWriter(OptionUtils): formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. + :param ignoreLeadingWhiteSpace: a flag indicating whether or not leading whitespaces from + values being written should be skipped. If None is set, it + uses the default value, ``true``. + :param ignoreTrailingWhiteSpace: a flag indicating whether or not trailing whitespaces from + values being written should be skipped. If None is set, it + uses the default value, ``true``. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header, nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll, - dateFormat=dateFormat, timestampFormat=timestampFormat) + dateFormat=dateFormat, timestampFormat=timestampFormat, + ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, + ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace) self._jwrite.csv(path) @since(1.5) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index e227f9ceb57699b8131307c4da1e6663c978030e..80f4340cdf1346607ab2cf14040cb4d372f0cf25 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -597,12 +597,12 @@ class DataStreamReader(OptionUtils): default value, ``false``. :param inferSchema: infers the input schema automatically from data. It requires one extra pass over the data. If None is set, it uses the default value, ``false``. - :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values - being read should be skipped. If None is set, it uses - the default value, ``false``. - :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values - being read should be skipped. If None is set, it uses - the default value, ``false``. + :param ignoreLeadingWhiteSpace: a flag indicating whether or not leading whitespaces from + values being read should be skipped. If None is set, it + uses the default value, ``false``. + :param ignoreTrailingWhiteSpace: a flag indicating whether or not trailing whitespaces from + values being read should be skipped. If None is set, it + uses the default value, ``false``. :param nullValue: sets the string representation of a null value. If None is set, it uses the default value, empty string. Since 2.0.1, this ``nullValue`` param applies to all supported types including the string type. diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index f0a9a0400e3927b2274337ec58967b561776b650..29d613bc5fe3c20c3399ad56e6185372c278f423 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -450,6 +450,19 @@ class SQLTests(ReusedPySparkTestCase): Row(_c0=u'Hyukjin', _c1=u'25', _c2=u'I am Hyukjin\n\nI love Spark!')] self.assertEqual(ages_newlines.collect(), expected) + def test_ignorewhitespace_csv(self): + tmpPath = tempfile.mkdtemp() + shutil.rmtree(tmpPath) + self.spark.createDataFrame([[" a", "b ", " c "]]).write.csv( + tmpPath, + ignoreLeadingWhiteSpace=False, + ignoreTrailingWhiteSpace=False) + + expected = [Row(value=u' a,b , c ')] + readback = self.spark.read.text(tmpPath) + self.assertEqual(readback.collect(), expected) + shutil.rmtree(tmpPath) + def test_read_multiple_orc_file(self): df = self.spark.read.orc(["python/test_support/sql/orc_partitioned/b=0/c=0", "python/test_support/sql/orc_partitioned/b=1/c=1"]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e39b4d91f1f6af749f4edce9d220ca0dcac9bbb7..e6d2b1bc28d95b9649c797a57eb23a4a9972ae4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -489,9 +489,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`header` (default `false`): uses the first line as names of columns.</li> * <li>`inferSchema` (default `false`): infers the input schema automatically from data. It * requires one extra pass over the data.</li> - * <li>`ignoreLeadingWhiteSpace` (default `false`): defines whether or not leading whitespaces - * from values being read should be skipped.</li> - * <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing + * <li>`ignoreLeadingWhiteSpace` (default `false`): a flag indicating whether or not leading + * whitespaces from values being read should be skipped.</li> + * <li>`ignoreTrailingWhiteSpace` (default `false`): a flag indicating whether or not trailing * whitespaces from values being read should be skipped.</li> * <li>`nullValue` (default empty string): sets the string representation of a null value. Since * 2.0.1, this applies to all supported types including the string type.</li> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 3e975ef6a3c24de80bf3ecc4622daa2ac3aa805e..e973d0bc6d09b4e7774c29319b7a90438d4f3d8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -573,7 +573,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * <li>`escapeQuotes` (default `true`): a flag indicating whether values containing * quotes should always be enclosed in quotes. Default is to escape all values containing * a quote character.</li> - * <li>`quoteAll` (default `false`): A flag indicating whether all values should always be + * <li>`quoteAll` (default `false`): a flag indicating whether all values should always be * enclosed in quotes. Default is to only escape values containing a quote character.</li> * <li>`header` (default `false`): writes the names of columns as the first line.</li> * <li>`nullValue` (default empty string): sets the string representation of a null value.</li> @@ -586,6 +586,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> + * <li>`ignoreLeadingWhiteSpace` (default `true`): a flag indicating whether or not leading + * whitespaces from values being written should be skipped.</li> + * <li>`ignoreTrailingWhiteSpace` (default `true`): a flag indicating defines whether or not + * trailing whitespaces from values being written should be skipped.</li> * </ul> * * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 5d2c23ed9618ce856de59e9984137ff49e6f9a97..e7b79e0cbfd17a617a2473a29f9b5614bb20c1ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -93,8 +93,13 @@ class CSVOptions( val headerFlag = getBool("header") val inferSchemaFlag = getBool("inferSchema") - val ignoreLeadingWhiteSpaceFlag = getBool("ignoreLeadingWhiteSpace") - val ignoreTrailingWhiteSpaceFlag = getBool("ignoreTrailingWhiteSpace") + val ignoreLeadingWhiteSpaceInRead = getBool("ignoreLeadingWhiteSpace", default = false) + val ignoreTrailingWhiteSpaceInRead = getBool("ignoreTrailingWhiteSpace", default = false) + + // For write, both options were `true` by default. We leave it as `true` for + // backwards compatibility. + val ignoreLeadingWhiteSpaceFlagInWrite = getBool("ignoreLeadingWhiteSpace", default = true) + val ignoreTrailingWhiteSpaceFlagInWrite = getBool("ignoreTrailingWhiteSpace", default = true) val columnNameOfCorruptRecord = parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) @@ -144,6 +149,8 @@ class CSVOptions( format.setQuote(quote) format.setQuoteEscape(escape) format.setComment(comment) + writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite) + writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite) writerSettings.setNullValue(nullValue) writerSettings.setEmptyValue(nullValue) writerSettings.setSkipEmptyLines(true) @@ -159,8 +166,8 @@ class CSVOptions( format.setQuote(quote) format.setQuoteEscape(escape) format.setComment(comment) - settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlag) - settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlag) + settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead) + settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead) settings.setReadInputOnSeparateThread(false) settings.setInputBufferSize(inputBufferSize) settings.setMaxColumns(maxColumns) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index f6e2fef74b8dbfc98a6cad5ca913ae166d23694a..997ca286597dac12f7fc02e36abf3cf4f0fa141b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -238,9 +238,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * <li>`header` (default `false`): uses the first line as names of columns.</li> * <li>`inferSchema` (default `false`): infers the input schema automatically from data. It * requires one extra pass over the data.</li> - * <li>`ignoreLeadingWhiteSpace` (default `false`): defines whether or not leading whitespaces - * from values being read should be skipped.</li> - * <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing + * <li>`ignoreLeadingWhiteSpace` (default `false`): a flag indicating whether or not leading + * whitespaces from values being read should be skipped.</li> + * <li>`ignoreTrailingWhiteSpace` (default `false`): a flag indicating whether or not trailing * whitespaces from values being read should be skipped.</li> * <li>`nullValue` (default empty string): sets the string representation of a null value. Since * 2.0.1, this applies to all supported types including the string type.</li> diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 2600894ca303cb0de87ac9462db90bbe2762092e..d70c47f4e2379da6dca3039e98a3ce534935f566 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1117,4 +1117,61 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { assert(df2.schema === schema) } + test("ignoreLeadingWhiteSpace and ignoreTrailingWhiteSpace options - read") { + val input = " a,b , c " + + // For reading, default of both `ignoreLeadingWhiteSpace` and`ignoreTrailingWhiteSpace` + // are `false`. So, these are excluded. + val combinations = Seq( + (true, true), + (false, true), + (true, false)) + + // Check if read rows ignore whitespaces as configured. + val expectedRows = Seq( + Row("a", "b", "c"), + Row(" a", "b", " c"), + Row("a", "b ", "c ")) + + combinations.zip(expectedRows) + .foreach { case ((ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace), expected) => + val df = spark.read + .option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace) + .option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace) + .csv(Seq(input).toDS()) + + checkAnswer(df, expected) + } + } + + test("SPARK-18579: ignoreLeadingWhiteSpace and ignoreTrailingWhiteSpace options - write") { + val df = Seq((" a", "b ", " c ")).toDF() + + // For writing, default of both `ignoreLeadingWhiteSpace` and `ignoreTrailingWhiteSpace` + // are `true`. So, these are excluded. + val combinations = Seq( + (false, false), + (false, true), + (true, false)) + + // Check if written lines ignore each whitespaces as configured. + val expectedLines = Seq( + " a,b , c ", + " a,b, c", + "a,b ,c ") + + combinations.zip(expectedLines) + .foreach { case ((ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace), expected) => + withTempPath { path => + df.write + .option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace) + .option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace) + .csv(path.getAbsolutePath) + + // Read back the written lines. + val readBack = spark.read.text(path.getAbsolutePath) + checkAnswer(readBack, Row(expected)) + } + } + } }