From dec9aa3b37c01454065a4d8899859991f43d4c66 Mon Sep 17 00:00:00 2001 From: Wenchen Fan <wenchen@databricks.com> Date: Sun, 4 Jun 2017 13:43:51 -0700 Subject: [PATCH] [SPARK-20961][SQL] generalize the dictionary in ColumnVector ## What changes were proposed in this pull request? As the first step of https://issues.apache.org/jira/browse/SPARK-20960 , to make `ColumnVector` public, this PR generalize `ColumnVector.dictionary` to not couple with parquet. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #18183 from cloud-fan/dictionary. --- .../parquet/ParquetDictionary.java | 53 +++++++++++++++++++ .../parquet/VectorizedColumnReader.java | 2 +- .../VectorizedParquetRecordReader.java | 17 +++--- .../execution/vectorized/ColumnVector.java | 15 ++---- .../sql/execution/vectorized/Dictionary.java | 34 ++++++++++++ 5 files changed, 100 insertions(+), 21 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/Dictionary.java diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java new file mode 100644 index 0000000000..0930edeb35 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import org.apache.spark.sql.execution.vectorized.Dictionary; + +public final class ParquetDictionary implements Dictionary { + private org.apache.parquet.column.Dictionary dictionary; + + public ParquetDictionary(org.apache.parquet.column.Dictionary dictionary) { + this.dictionary = dictionary; + } + + @Override + public int decodeToInt(int id) { + return dictionary.decodeToInt(id); + } + + @Override + public long decodeToLong(int id) { + return dictionary.decodeToLong(id); + } + + @Override + public float decodeToFloat(int id) { + return dictionary.decodeToFloat(id); + } + + @Override + public double decodeToDouble(int id) { + return dictionary.decodeToDouble(id); + } + + @Override + public byte[] decodeToBinary(int id) { + return dictionary.decodeToBinary(id).getBytes(); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 9d641b5287..fd8db17272 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -169,7 +169,7 @@ public class VectorizedColumnReader { // Column vector supports lazy decoding of dictionary values so just set the dictionary. // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some // non-dictionary encoded values have already been added). - column.setDictionary(dictionary); + column.setDictionary(new ParquetDictionary(dictionary)); } else { decodeDictionaryIds(rowId, num, column, dictionaryIds); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 51bdf0f0f2..04f8141d66 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -154,12 +154,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa return (float) rowsReturned / totalRowCount; } - /** - * Returns the ColumnarBatch object that will be used for all rows returned by this reader. - * This object is reused. Calling this enables the vectorized reader. This should be called - * before any calls to nextKeyValue/nextBatch. - */ - // Creates a columnar batch that includes the schema from the data files and the additional // partition columns appended to the end of the batch. // For example, if the data contains two columns, with 2 partition columns: @@ -204,12 +198,17 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues); } + /** + * Returns the ColumnarBatch object that will be used for all rows returned by this reader. + * This object is reused. Calling this enables the vectorized reader. This should be called + * before any calls to nextKeyValue/nextBatch. + */ public ColumnarBatch resultBatch() { if (columnarBatch == null) initBatch(); return columnarBatch; } - /* + /** * Can be called before any rows are returned to enable returning columnar batches directly. */ public void enableReturningBatches() { @@ -237,9 +236,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa } private void initializeInternal() throws IOException, UnsupportedOperationException { - /** - * Check that the requested schema is supported. - */ + // Check that the requested schema is supported. missingColumns = new boolean[requestedSchema.getFieldCount()]; for (int i = 0; i < requestedSchema.getFieldCount(); ++i) { Type t = requestedSchema.getFields().get(i); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index ad267ab0c9..24260a6019 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -20,8 +20,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import com.google.common.annotations.VisibleForTesting; -import org.apache.parquet.column.Dictionary; -import org.apache.parquet.io.api.Binary; import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; @@ -313,8 +311,8 @@ public abstract class ColumnVector implements AutoCloseable { } /** - * Ensures that there is enough storage to store capcity elements. That is, the put() APIs - * must work for all rowIds < capcity. + * Ensures that there is enough storage to store capacity elements. That is, the put() APIs + * must work for all rowIds < capacity. */ protected abstract void reserveInternal(int capacity); @@ -479,7 +477,6 @@ public abstract class ColumnVector implements AutoCloseable { /** * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count) - * src should contain `count` doubles written as ieee format. */ public abstract void putFloats(int rowId, int count, float[] src, int srcIndex); @@ -506,7 +503,6 @@ public abstract class ColumnVector implements AutoCloseable { /** * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count) - * src should contain `count` doubles written as ieee format. */ public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex); @@ -628,8 +624,8 @@ public abstract class ColumnVector implements AutoCloseable { ColumnVector.Array a = getByteArray(rowId); return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length); } else { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); - return UTF8String.fromBytes(v.getBytes()); + byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); + return UTF8String.fromBytes(bytes); } } @@ -643,8 +639,7 @@ public abstract class ColumnVector implements AutoCloseable { System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length); return bytes; } else { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); - return v.getBytes(); + return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/Dictionary.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/Dictionary.java new file mode 100644 index 0000000000..c698168b4c --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/Dictionary.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized; + +/** + * The interface for dictionary in ColumnVector to decode dictionary encoded values. + */ +public interface Dictionary { + + int decodeToInt(int id); + + long decodeToLong(int id); + + float decodeToFloat(int id); + + double decodeToDouble(int id); + + byte[] decodeToBinary(int id); +} -- GitLab