Skip to content
Snippets Groups Projects
Commit 5989c85b authored by Nong Li's avatar Nong Li Committed by Davies Liu
Browse files

[SPARK-14217] [SQL] Fix bug if parquet data has columns that use dictionary...

[SPARK-14217] [SQL] Fix bug if parquet data has columns that use dictionary encoding for some of the data

## What changes were proposed in this pull request?

This PR is based on #12017

Currently, this causes batches where some values are dictionary encoded and some
which are not. The non-dictionary encoded values cause us to remove the dictionary
from the batch causing the first values to return garbage.

This patch fixes the issue by first decoding the dictionary for the values that are
already dictionary encoded before switching. A similar thing is done for the reverse
case where the initial values are not dictionary encoded.

## How was this patch tested?

This is difficult to test but replicated on a test cluster using a large tpcds data set.

Author: Nong Li <nong@databricks.com>
Author: Davies Liu <davies@databricks.com>

Closes #12279 from davies/fix_dict.
parent 5cb5edaf
No related branches found
No related tags found
No related merge requests found
...@@ -27,6 +27,7 @@ import org.apache.parquet.column.Encoding; ...@@ -27,6 +27,7 @@ import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.*; import org.apache.parquet.column.page.*;
import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.spark.sql.execution.vectorized.ColumnVector; import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DataTypes;
...@@ -114,57 +115,6 @@ public class VectorizedColumnReader { ...@@ -114,57 +115,6 @@ public class VectorizedColumnReader {
} }
} }
/**
* TODO: Hoist the useDictionary branch to decode*Batch and make the batch page aligned.
*/
public boolean nextBoolean() {
if (!useDictionary) {
return dataColumn.readBoolean();
} else {
return dictionary.decodeToBoolean(dataColumn.readValueDictionaryId());
}
}
public int nextInt() {
if (!useDictionary) {
return dataColumn.readInteger();
} else {
return dictionary.decodeToInt(dataColumn.readValueDictionaryId());
}
}
public long nextLong() {
if (!useDictionary) {
return dataColumn.readLong();
} else {
return dictionary.decodeToLong(dataColumn.readValueDictionaryId());
}
}
public float nextFloat() {
if (!useDictionary) {
return dataColumn.readFloat();
} else {
return dictionary.decodeToFloat(dataColumn.readValueDictionaryId());
}
}
public double nextDouble() {
if (!useDictionary) {
return dataColumn.readDouble();
} else {
return dictionary.decodeToDouble(dataColumn.readValueDictionaryId());
}
}
public Binary nextBinary() {
if (!useDictionary) {
return dataColumn.readBytes();
} else {
return dictionary.decodeToBinary(dataColumn.readValueDictionaryId());
}
}
/** /**
* Advances to the next value. Returns true if the value is non-null. * Advances to the next value. Returns true if the value is non-null.
*/ */
...@@ -200,8 +150,26 @@ public class VectorizedColumnReader { ...@@ -200,8 +150,26 @@ public class VectorizedColumnReader {
ColumnVector dictionaryIds = column.reserveDictionaryIds(total); ColumnVector dictionaryIds = column.reserveDictionaryIds(total);
defColumn.readIntegers( defColumn.readIntegers(
num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
decodeDictionaryIds(rowId, num, column, dictionaryIds);
if (column.hasDictionary() || (rowId == 0 &&
(descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) {
// Column vector supports lazy decoding of dictionary values so just set the dictionary.
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
// non-dictionary encoded values have already been added).
column.setDictionary(dictionary);
} else {
decodeDictionaryIds(rowId, num, column, dictionaryIds);
}
} else { } else {
if (column.hasDictionary() && rowId != 0) {
// This batch already has dictionary encoded values but this new page is not. The batch
// does not support a mix of dictionary and not so we will decode the dictionary.
decodeDictionaryIds(0, rowId, column, column.getDictionaryIds());
}
column.setDictionary(null); column.setDictionary(null);
switch (descriptor.getType()) { switch (descriptor.getType()) {
case BOOLEAN: case BOOLEAN:
...@@ -246,11 +214,45 @@ public class VectorizedColumnReader { ...@@ -246,11 +214,45 @@ public class VectorizedColumnReader {
ColumnVector dictionaryIds) { ColumnVector dictionaryIds) {
switch (descriptor.getType()) { switch (descriptor.getType()) {
case INT32: case INT32:
if (column.dataType() == DataTypes.IntegerType ||
DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
column.putInt(i, dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
} else if (column.dataType() == DataTypes.ByteType) {
for (int i = rowId; i < rowId + num; ++i) {
column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
} else if (column.dataType() == DataTypes.ShortType) {
for (int i = rowId; i < rowId + num; ++i) {
column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i)));
}
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
break;
case INT64: case INT64:
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i)));
}
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
break;
case FLOAT: case FLOAT:
for (int i = rowId; i < rowId + num; ++i) {
column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getInt(i)));
}
break;
case DOUBLE: case DOUBLE:
case BINARY: for (int i = rowId; i < rowId + num; ++i) {
column.setDictionary(dictionary); column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getInt(i)));
}
break; break;
case INT96: case INT96:
if (column.dataType() == DataTypes.TimestampType) { if (column.dataType() == DataTypes.TimestampType) {
...@@ -263,6 +265,16 @@ public class VectorizedColumnReader { ...@@ -263,6 +265,16 @@ public class VectorizedColumnReader {
throw new NotImplementedException(); throw new NotImplementedException();
} }
break; break;
case BINARY:
// TODO: this is incredibly inefficient as it blows up the dictionary right here. We
// need to do this better. We should probably add the dictionary data to the ColumnVector
// and reuse it across batches. This should mean adding a ByteArray would just update
// the length and offset.
for (int i = rowId; i < rowId + num; ++i) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
column.putByteArray(i, v.getBytes());
}
break;
case FIXED_LEN_BYTE_ARRAY: case FIXED_LEN_BYTE_ARRAY:
// DecimalType written in the legacy mode // DecimalType written in the legacy mode
if (DecimalType.is32BitDecimalType(column.dataType())) { if (DecimalType.is32BitDecimalType(column.dataType())) {
......
...@@ -912,6 +912,11 @@ public abstract class ColumnVector implements AutoCloseable { ...@@ -912,6 +912,11 @@ public abstract class ColumnVector implements AutoCloseable {
this.dictionary = dictionary; this.dictionary = dictionary;
} }
/**
* Returns true if this column has a dictionary.
*/
public boolean hasDictionary() { return this.dictionary != null; }
/** /**
* Reserve a integer column for ids of dictionary. * Reserve a integer column for ids of dictionary.
*/ */
...@@ -926,6 +931,13 @@ public abstract class ColumnVector implements AutoCloseable { ...@@ -926,6 +931,13 @@ public abstract class ColumnVector implements AutoCloseable {
return dictionaryIds; return dictionaryIds;
} }
/**
* Returns the underlying integer column for ids of dictionary.
*/
public ColumnVector getDictionaryIds() {
return dictionaryIds;
}
/** /**
* Sets up the common state and also handles creating the child columns if this is a nested * Sets up the common state and also handles creating the child columns if this is a nested
* type. * type.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment