diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 6c47dc09a863765c42c065534f5b8cb7af44a31a..4ed59b08a467c82dd83df044d6c318a490ab6ab7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -221,15 +221,15 @@ public class VectorizedColumnReader { if (column.dataType() == DataTypes.IntegerType || DecimalType.is32BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - column.putInt(i, dictionary.decodeToInt(dictionaryIds.getInt(i))); + column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i))); } } else if (column.dataType() == DataTypes.ByteType) { for (int i = rowId; i < rowId + num; ++i) { - column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getInt(i))); + column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i))); } } else if (column.dataType() == DataTypes.ShortType) { for (int i = rowId; i < rowId + num; ++i) { - column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i))); + column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i))); } } else { throw new UnsupportedOperationException("Unimplemented type: " + column.dataType()); @@ -240,7 +240,7 @@ public class VectorizedColumnReader { if (column.dataType() == DataTypes.LongType || DecimalType.is64BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i))); + column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i))); } } else { throw new UnsupportedOperationException("Unimplemented type: " + column.dataType()); @@ -249,20 +249,20 @@ public class VectorizedColumnReader { case FLOAT: for (int i = rowId; i < rowId + num; ++i) { - column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getInt(i))); + column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i))); } break; case DOUBLE: for (int i = rowId; i < rowId + num; ++i) { - column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getInt(i))); + column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i))); } break; case INT96: if (column.dataType() == DataTypes.TimestampType) { for (int i = rowId; i < rowId + num; ++i) { // TODO: Convert dictionary of Binaries to dictionary of Longs - Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); } } else { @@ -275,7 +275,7 @@ public class VectorizedColumnReader { // and reuse it across batches. This should mean adding a ByteArray would just update // the length and offset. for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); column.putByteArray(i, v.getBytes()); } break; @@ -283,17 +283,17 @@ public class VectorizedColumnReader { // DecimalType written in the legacy mode if (DecimalType.is32BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v)); } } else if (DecimalType.is64BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v)); } } else if (DecimalType.isByteArrayDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); column.putByteArray(i, v.getBytes()); } } else { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 59173d253b2985b42e5e54fcc9f868c41acc53d7..a7cb3b11f687a19d9e0dad70cb7b279e32faed23 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -428,6 +428,13 @@ public abstract class ColumnVector implements AutoCloseable { */ public abstract int getInt(int rowId); + /** + * Returns the dictionary Id for rowId. + * This should only be called when the ColumnVector is dictionaryIds. + * We have this separate method for dictionaryIds as per SPARK-16928. + */ + public abstract int getDictId(int rowId); + /** * Sets the value at rowId to `value`. */ @@ -615,7 +622,7 @@ public abstract class ColumnVector implements AutoCloseable { ColumnVector.Array a = getByteArray(rowId); return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length); } else { - Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(rowId)); + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); return UTF8String.fromBytes(v.getBytes()); } } @@ -630,7 +637,7 @@ public abstract class ColumnVector implements AutoCloseable { System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length); return bytes; } else { - Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(rowId)); + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); return v.getBytes(); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 913a05a0aa0ec3d66130a889fa196cc52b9e6ee0..12fa109cec8239ccdc734e0d6d44b654ffc298ce 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -161,7 +161,7 @@ public final class OffHeapColumnVector extends ColumnVector { if (dictionary == null) { return Platform.getByte(null, data + rowId); } else { - return (byte) dictionary.decodeToInt(dictionaryIds.getInt(rowId)); + return (byte) dictionary.decodeToInt(dictionaryIds.getDictId(rowId)); } } @@ -193,7 +193,7 @@ public final class OffHeapColumnVector extends ColumnVector { if (dictionary == null) { return Platform.getShort(null, data + 2 * rowId); } else { - return (short) dictionary.decodeToInt(dictionaryIds.getInt(rowId)); + return (short) dictionary.decodeToInt(dictionaryIds.getDictId(rowId)); } } @@ -240,10 +240,21 @@ public final class OffHeapColumnVector extends ColumnVector { if (dictionary == null) { return Platform.getInt(null, data + 4 * rowId); } else { - return dictionary.decodeToInt(dictionaryIds.getInt(rowId)); + return dictionary.decodeToInt(dictionaryIds.getDictId(rowId)); } } + /** + * Returns the dictionary Id for rowId. + * This should only be called when the ColumnVector is dictionaryIds. + * We have this separate method for dictionaryIds as per SPARK-16928. + */ + public int getDictId(int rowId) { + assert(dictionary == null) + : "A ColumnVector dictionary should not have a dictionary for itself."; + return Platform.getInt(null, data + 4 * rowId); + } + // // APIs dealing with Longs // @@ -287,7 +298,7 @@ public final class OffHeapColumnVector extends ColumnVector { if (dictionary == null) { return Platform.getLong(null, data + 8 * rowId); } else { - return dictionary.decodeToLong(dictionaryIds.getInt(rowId)); + return dictionary.decodeToLong(dictionaryIds.getDictId(rowId)); } } @@ -333,7 +344,7 @@ public final class OffHeapColumnVector extends ColumnVector { if (dictionary == null) { return Platform.getFloat(null, data + rowId * 4); } else { - return dictionary.decodeToFloat(dictionaryIds.getInt(rowId)); + return dictionary.decodeToFloat(dictionaryIds.getDictId(rowId)); } } @@ -380,7 +391,7 @@ public final class OffHeapColumnVector extends ColumnVector { if (dictionary == null) { return Platform.getDouble(null, data + rowId * 8); } else { - return dictionary.decodeToDouble(dictionaryIds.getInt(rowId)); + return dictionary.decodeToDouble(dictionaryIds.getDictId(rowId)); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 85067df4ebf910a0f7896221deb38bbf6f2dd286..9b410bacff5df4959aa5ac512c081b65687a68fe 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -158,7 +158,7 @@ public final class OnHeapColumnVector extends ColumnVector { if (dictionary == null) { return byteData[rowId]; } else { - return (byte) dictionary.decodeToInt(dictionaryIds.getInt(rowId)); + return (byte) dictionary.decodeToInt(dictionaryIds.getDictId(rowId)); } } @@ -188,7 +188,7 @@ public final class OnHeapColumnVector extends ColumnVector { if (dictionary == null) { return shortData[rowId]; } else { - return (short) dictionary.decodeToInt(dictionaryIds.getInt(rowId)); + return (short) dictionary.decodeToInt(dictionaryIds.getDictId(rowId)); } } @@ -230,10 +230,21 @@ public final class OnHeapColumnVector extends ColumnVector { if (dictionary == null) { return intData[rowId]; } else { - return dictionary.decodeToInt(dictionaryIds.getInt(rowId)); + return dictionary.decodeToInt(dictionaryIds.getDictId(rowId)); } } + /** + * Returns the dictionary Id for rowId. + * This should only be called when the ColumnVector is dictionaryIds. + * We have this separate method for dictionaryIds as per SPARK-16928. + */ + public int getDictId(int rowId) { + assert(dictionary == null) + : "A ColumnVector dictionary should not have a dictionary for itself."; + return intData[rowId]; + } + // // APIs dealing with Longs // @@ -271,7 +282,7 @@ public final class OnHeapColumnVector extends ColumnVector { if (dictionary == null) { return longData[rowId]; } else { - return dictionary.decodeToLong(dictionaryIds.getInt(rowId)); + return dictionary.decodeToLong(dictionaryIds.getDictId(rowId)); } } @@ -310,7 +321,7 @@ public final class OnHeapColumnVector extends ColumnVector { if (dictionary == null) { return floatData[rowId]; } else { - return dictionary.decodeToFloat(dictionaryIds.getInt(rowId)); + return dictionary.decodeToFloat(dictionaryIds.getDictId(rowId)); } } @@ -351,7 +362,7 @@ public final class OnHeapColumnVector extends ColumnVector { if (dictionary == null) { return doubleData[rowId]; } else { - return dictionary.decodeToDouble(dictionaryIds.getInt(rowId)); + return dictionary.decodeToDouble(dictionaryIds.getDictId(rowId)); } }