Skip to content
Snippets Groups Projects
Commit 80091b8a authored by Davies Liu's avatar Davies Liu Committed by Cheng Lian
Browse files

[SPARK-14031][SQL] speedup CSV writer

## What changes were proposed in this pull request?

Currently, we create an CSVWriter for every row, it's very expensive and memory hungry, took about 15 seconds to write out 1 mm rows (two columns).

This PR will write the rows in batch mode, create a CSVWriter for every 1k rows, which could write out 1 mm rows in about 1 seconds (15X faster).

## How was this patch tested?

Manually benchmark it.

Author: Davies Liu <davies@databricks.com>

Closes #13229 from davies/csv_writer.
parent dafcb05c
No related branches found
No related tags found
No related merge requests found
...@@ -76,17 +76,26 @@ private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten ...@@ -76,17 +76,26 @@ private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten
writerSettings.setQuoteAllFields(false) writerSettings.setQuoteAllFields(false)
writerSettings.setHeaders(headers: _*) writerSettings.setHeaders(headers: _*)
def writeRow(row: Seq[String], includeHeader: Boolean): String = { private var buffer = new ByteArrayOutputStream()
val buffer = new ByteArrayOutputStream() private var writer = new CsvWriter(
val outputWriter = new OutputStreamWriter(buffer, StandardCharsets.UTF_8) new OutputStreamWriter(buffer, StandardCharsets.UTF_8),
val writer = new CsvWriter(outputWriter, writerSettings) writerSettings)
def writeRow(row: Seq[String], includeHeader: Boolean): Unit = {
if (includeHeader) { if (includeHeader) {
writer.writeHeaders() writer.writeHeaders()
} }
writer.writeRow(row.toArray: _*) writer.writeRow(row.toArray: _*)
}
def flush(): String = {
writer.close() writer.close()
buffer.toString.stripLineEnd val lines = buffer.toString.stripLineEnd
buffer = new ByteArrayOutputStream()
writer = new CsvWriter(
new OutputStreamWriter(buffer, StandardCharsets.UTF_8),
writerSettings)
lines
} }
} }
......
...@@ -176,8 +176,8 @@ private[sql] class CsvOutputWriter( ...@@ -176,8 +176,8 @@ private[sql] class CsvOutputWriter(
}.getRecordWriter(context) }.getRecordWriter(context)
} }
private var firstRow: Boolean = params.headerFlag private val FLUSH_BATCH_SIZE = 1024L
private var records: Long = 0L
private val csvWriter = new LineCsvWriter(params, dataSchema.fieldNames.toSeq) private val csvWriter = new LineCsvWriter(params, dataSchema.fieldNames.toSeq)
private def rowToString(row: Seq[Any]): Seq[String] = row.map { field => private def rowToString(row: Seq[Any]): Seq[String] = row.map { field =>
...@@ -191,16 +191,23 @@ private[sql] class CsvOutputWriter( ...@@ -191,16 +191,23 @@ private[sql] class CsvOutputWriter(
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
override protected[sql] def writeInternal(row: InternalRow): Unit = { override protected[sql] def writeInternal(row: InternalRow): Unit = {
// TODO: Instead of converting and writing every row, we should use the univocity buffer csvWriter.writeRow(rowToString(row.toSeq(dataSchema)), records == 0L && params.headerFlag)
val resultString = csvWriter.writeRow(rowToString(row.toSeq(dataSchema)), firstRow) records += 1
if (firstRow) { if (records % FLUSH_BATCH_SIZE == 0) {
firstRow = false flush()
}
}
private def flush(): Unit = {
val lines = csvWriter.flush()
if (lines.nonEmpty) {
text.set(lines)
recordWriter.write(NullWritable.get(), text)
} }
text.set(resultString)
recordWriter.write(NullWritable.get(), text)
} }
override def close(): Unit = { override def close(): Unit = {
flush()
recordWriter.close(context) recordWriter.close(context)
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment