diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 804031a5bb5f87615ffa9b43d601e77ae4e18e18..3b3b87e4354d6f4f06411590f16958bd05163c1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -54,39 +54,77 @@ private[csv] class UnivocityParser( private val dataSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord)) - private val valueConverters = - dataSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray - private val tokenizer = new CsvParser(options.asParserSettings) private var numMalformedRecords = 0 private val row = new GenericInternalRow(requiredSchema.length) - // This gets the raw input that is parsed lately. + // In `PERMISSIVE` parse mode, we should be able to put the raw malformed row into the field + // specified in `columnNameOfCorruptRecord`. The raw input is retrieved by this method. private def getCurrentInput(): String = tokenizer.getContext.currentParsedContent().stripLineEnd - // This parser loads an `indexArr._1`-th position value in input tokens, - // then put the value in `row(indexArr._2)`. - private val indexArr: Array[(Int, Int)] = { - val fields = if (options.dropMalformed) { - // If `dropMalformed` is enabled, then it needs to parse all the values - // so that we can decide which row is malformed. - requiredSchema ++ schema.filterNot(requiredSchema.contains(_)) - } else { - requiredSchema - } - // TODO: Revisit this; we need to clean up code here for readability. - // See an URL below for related discussions: - // https://github.com/apache/spark/pull/16928#discussion_r102636720 - val fieldsWithIndexes = fields.zipWithIndex - corruptFieldIndex.map { case corrFieldIndex => - fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex } - }.getOrElse { - fieldsWithIndexes - }.map { case (f, i) => - (dataSchema.indexOf(f), i) - }.toArray + // This parser loads an `tokenIndexArr`-th position value in input tokens, + // then put the value in `row(rowIndexArr)`. + // + // For example, let's say there is CSV data as below: + // + // a,b,c + // 1,2,A + // + // Also, let's say `columnNameOfCorruptRecord` is set to "_unparsed", `header` is `true` + // by user and the user selects "c", "b", "_unparsed" and "a" fields. In this case, we need + // to map those values below: + // + // required schema - ["c", "b", "_unparsed", "a"] + // CSV data schema - ["a", "b", "c"] + // required CSV data schema - ["c", "b", "a"] + // + // with the input tokens, + // + // input tokens - [1, 2, "A"] + // + // Each input token is placed in each output row's position by mapping these. In this case, + // + // output row - ["A", 2, null, 1] + // + // In more details, + // - `valueConverters`, input tokens - CSV data schema + // `valueConverters` keeps the positions of input token indices (by its index) to each + // value's converter (by its value) in an order of CSV data schema. In this case, + // [string->int, string->int, string->string]. + // + // - `tokenIndexArr`, input tokens - required CSV data schema + // `tokenIndexArr` keeps the positions of input token indices (by its index) to reordered + // fields given the required CSV data schema (by its value). In this case, [2, 1, 0]. + // + // - `rowIndexArr`, input tokens - required schema + // `rowIndexArr` keeps the positions of input token indices (by its index) to reordered + // field indices given the required schema (by its value). In this case, [0, 1, 3]. + private val valueConverters: Array[ValueConverter] = + dataSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray + + // Only used to create both `tokenIndexArr` and `rowIndexArr`. This variable means + // the fields that we should try to convert. + private val reorderedFields = if (options.dropMalformed) { + // If `dropMalformed` is enabled, then it needs to parse all the values + // so that we can decide which row is malformed. + requiredSchema ++ schema.filterNot(requiredSchema.contains(_)) + } else { + requiredSchema + } + + private val tokenIndexArr: Array[Int] = { + reorderedFields + .filter(_.name != options.columnNameOfCorruptRecord) + .map(f => dataSchema.indexOf(f)).toArray + } + + private val rowIndexArr: Array[Int] = if (corruptFieldIndex.isDefined) { + val corrFieldIndex = corruptFieldIndex.get + reorderedFields.indices.filter(_ != corrFieldIndex).toArray + } else { + reorderedFields.indices.toArray } /** @@ -200,14 +238,15 @@ private[csv] class UnivocityParser( private def convert(tokens: Array[String]): Option[InternalRow] = { convertWithParseMode(tokens) { tokens => var i: Int = 0 - while (i < indexArr.length) { - val (pos, rowIdx) = indexArr(i) + while (i < tokenIndexArr.length) { // It anyway needs to try to parse since it decides if this row is malformed // or not after trying to cast in `DROPMALFORMED` mode even if the casted // value is not stored in the row. - val value = valueConverters(pos).apply(tokens(pos)) + val from = tokenIndexArr(i) + val to = rowIndexArr(i) + val value = valueConverters(from).apply(tokens(from)) if (i < requiredSchema.length) { - row(rowIdx) = value + row(to) = value } i += 1 }