Skip to content
Snippets Groups Projects
Commit d556b317 authored by hyukjinkwon's avatar hyukjinkwon Committed by Wenchen Fan
Browse files

[SPARK-18699][SQL][FOLLOWUP] Add explanation in CSV parser and minor cleanup

## What changes were proposed in this pull request?

This PR suggests adding some comments in `UnivocityParser` logics to explain what happens. Also, it proposes, IMHO, a little bit cleaner (at least easy for me to explain).

## How was this patch tested?

Unit tests in `CSVSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17142 from HyukjinKwon/SPARK-18699.
parent 982f3223
No related branches found
No related tags found
No related merge requests found
......@@ -54,39 +54,77 @@ private[csv] class UnivocityParser(
private val dataSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
private val valueConverters =
dataSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
private val tokenizer = new CsvParser(options.asParserSettings)
private var numMalformedRecords = 0
private val row = new GenericInternalRow(requiredSchema.length)
// This gets the raw input that is parsed lately.
// In `PERMISSIVE` parse mode, we should be able to put the raw malformed row into the field
// specified in `columnNameOfCorruptRecord`. The raw input is retrieved by this method.
private def getCurrentInput(): String = tokenizer.getContext.currentParsedContent().stripLineEnd
// This parser loads an `indexArr._1`-th position value in input tokens,
// then put the value in `row(indexArr._2)`.
private val indexArr: Array[(Int, Int)] = {
val fields = if (options.dropMalformed) {
// If `dropMalformed` is enabled, then it needs to parse all the values
// so that we can decide which row is malformed.
requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
} else {
requiredSchema
}
// TODO: Revisit this; we need to clean up code here for readability.
// See an URL below for related discussions:
// https://github.com/apache/spark/pull/16928#discussion_r102636720
val fieldsWithIndexes = fields.zipWithIndex
corruptFieldIndex.map { case corrFieldIndex =>
fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex }
}.getOrElse {
fieldsWithIndexes
}.map { case (f, i) =>
(dataSchema.indexOf(f), i)
}.toArray
// This parser loads an `tokenIndexArr`-th position value in input tokens,
// then put the value in `row(rowIndexArr)`.
//
// For example, let's say there is CSV data as below:
//
// a,b,c
// 1,2,A
//
// Also, let's say `columnNameOfCorruptRecord` is set to "_unparsed", `header` is `true`
// by user and the user selects "c", "b", "_unparsed" and "a" fields. In this case, we need
// to map those values below:
//
// required schema - ["c", "b", "_unparsed", "a"]
// CSV data schema - ["a", "b", "c"]
// required CSV data schema - ["c", "b", "a"]
//
// with the input tokens,
//
// input tokens - [1, 2, "A"]
//
// Each input token is placed in each output row's position by mapping these. In this case,
//
// output row - ["A", 2, null, 1]
//
// In more details,
// - `valueConverters`, input tokens - CSV data schema
// `valueConverters` keeps the positions of input token indices (by its index) to each
// value's converter (by its value) in an order of CSV data schema. In this case,
// [string->int, string->int, string->string].
//
// - `tokenIndexArr`, input tokens - required CSV data schema
// `tokenIndexArr` keeps the positions of input token indices (by its index) to reordered
// fields given the required CSV data schema (by its value). In this case, [2, 1, 0].
//
// - `rowIndexArr`, input tokens - required schema
// `rowIndexArr` keeps the positions of input token indices (by its index) to reordered
// field indices given the required schema (by its value). In this case, [0, 1, 3].
private val valueConverters: Array[ValueConverter] =
dataSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
// Only used to create both `tokenIndexArr` and `rowIndexArr`. This variable means
// the fields that we should try to convert.
private val reorderedFields = if (options.dropMalformed) {
// If `dropMalformed` is enabled, then it needs to parse all the values
// so that we can decide which row is malformed.
requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
} else {
requiredSchema
}
private val tokenIndexArr: Array[Int] = {
reorderedFields
.filter(_.name != options.columnNameOfCorruptRecord)
.map(f => dataSchema.indexOf(f)).toArray
}
private val rowIndexArr: Array[Int] = if (corruptFieldIndex.isDefined) {
val corrFieldIndex = corruptFieldIndex.get
reorderedFields.indices.filter(_ != corrFieldIndex).toArray
} else {
reorderedFields.indices.toArray
}
/**
......@@ -200,14 +238,15 @@ private[csv] class UnivocityParser(
private def convert(tokens: Array[String]): Option[InternalRow] = {
convertWithParseMode(tokens) { tokens =>
var i: Int = 0
while (i < indexArr.length) {
val (pos, rowIdx) = indexArr(i)
while (i < tokenIndexArr.length) {
// It anyway needs to try to parse since it decides if this row is malformed
// or not after trying to cast in `DROPMALFORMED` mode even if the casted
// value is not stored in the row.
val value = valueConverters(pos).apply(tokens(pos))
val from = tokenIndexArr(i)
val to = rowIndexArr(i)
val value = valueConverters(from).apply(tokens(from))
if (i < requiredSchema.length) {
row(rowIdx) = value
row(to) = value
}
i += 1
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment