Skip to content
Snippets Groups Projects
Commit 0598a2b8 authored by Nong Li's avatar Nong Li Committed by Davies Liu
Browse files

[SPARK-13499] [SQL] Performance improvements for parquet reader.

## What changes were proposed in this pull request?

This patch includes these performance fixes:
  - Remove unnecessary setNotNull() calls. The NULL bits are cleared already.
  - Speed up RLE group decoding
  - Speed up dictionary decoding by decoding NULLs directly into the result.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

In addition to the updated benchmarks, on TPCDS, the result of these changes
running Q55 (sf40) is:

```
TPCDS:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
---------------------------------------------------------------------------------
q55 (Before)                             6398 / 6616         18.0          55.5
q55 (After)                              4983 / 5189         23.1          43.3
```

Author: Nong Li <nong@databricks.com>

Closes #11375 from nongli/spark-13499.
parent 6df1e55a
No related branches found
No related tags found
No related merge requests found
......@@ -628,7 +628,8 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
dictionaryIds.reserve(total);
}
// Read and decode dictionary ids.
readIntBatch(rowId, num, dictionaryIds);
defColumn.readIntegers(
num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
decodeDictionaryIds(rowId, num, column);
} else {
switch (descriptor.getType()) {
......@@ -739,18 +740,6 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
default:
throw new NotImplementedException("Unsupported type: " + descriptor.getType());
}
if (dictionaryIds.numNulls() > 0) {
// Copy the NULLs over.
// TODO: we can improve this by decoding the NULLs directly into column. This would
// mean we decode the int ids into `dictionaryIds` and the NULLs into `column` and then
// just do the ID remapping as above.
for (int i = 0; i < num; ++i) {
if (dictionaryIds.getIsNull(rowId + i)) {
column.putNull(rowId + i);
}
}
}
}
/**
......@@ -769,7 +758,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType) {
defColumn.readIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, 0);
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.ByteType) {
defColumn.readBytes(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
......
......@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet;
import org.apache.commons.lang.NotImplementedException;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
......@@ -176,11 +175,11 @@ public final class VectorizedRleValuesReader extends ValuesReader
* if (this.readInt() == level) {
* c[rowId] = data.readInteger();
* } else {
* c[rowId] = nullValue;
* c[rowId] = null;
* }
*/
public void readIntegers(int total, ColumnVector c, int rowId, int level,
VectorizedValuesReader data, int nullValue) {
VectorizedValuesReader data) {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
......@@ -189,7 +188,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readIntegers(n, c, rowId);
c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
......@@ -198,9 +196,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putInt(rowId + i, data.readInteger());
c.putNotNull(rowId + i);
} else {
c.putInt(rowId + i, nullValue);
c.putNull(rowId + i);
}
}
......@@ -223,7 +219,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readBooleans(n, c, rowId);
c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
......@@ -232,7 +227,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putBoolean(rowId + i, data.readBoolean());
c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
......@@ -257,7 +251,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; i++) {
c.putLong(rowId + i, data.readInteger());
}
c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
......@@ -266,7 +259,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putLong(rowId + i, data.readInteger());
c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
......@@ -289,7 +281,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readBytes(n, c, rowId);
c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
......@@ -298,7 +289,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putByte(rowId + i, data.readByte());
c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
......@@ -321,7 +311,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readLongs(n, c, rowId);
c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
......@@ -330,7 +319,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putLong(rowId + i, data.readLong());
c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
......@@ -353,7 +341,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readFloats(n, c, rowId);
c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
......@@ -362,7 +349,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putFloat(rowId + i, data.readFloat());
c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
......@@ -385,7 +371,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readDoubles(n, c, rowId);
c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
......@@ -394,7 +379,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putDouble(rowId + i, data.readDouble());
c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
......@@ -416,7 +400,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
switch (mode) {
case RLE:
if (currentValue == level) {
c.putNotNulls(rowId, n);
data.readBinary(n, c, rowId);
} else {
c.putNulls(rowId, n);
......@@ -425,7 +408,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putNotNull(rowId + i);
data.readBinary(1, c, rowId + i);
} else {
c.putNull(rowId + i);
......@@ -439,6 +421,40 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
}
/**
* Decoding for dictionary ids. The IDs are populated into `values` and the nullability is
* populated into `nulls`.
*/
public void readIntegers(int total, ColumnVector values, ColumnVector nulls, int rowId, int level,
VectorizedValuesReader data) {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
int n = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
if (currentValue == level) {
data.readIntegers(n, values, rowId);
} else {
nulls.putNulls(rowId, n);
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
values.putInt(rowId + i, data.readInteger());
} else {
nulls.putNull(rowId + i);
}
}
break;
}
rowId += n;
left -= n;
currentCount -= n;
}
}
// The RLE reader implements the vectorized decoding interface when used to decode dictionary
// IDs. This is different than the above APIs that decodes definitions levels along with values.
......@@ -560,12 +576,14 @@ public final class VectorizedRleValuesReader extends ValuesReader
throw new RuntimeException("Unreachable");
}
private int ceil8(int value) {
return (value + 7) / 8;
}
/**
* Reads the next group.
*/
private void readNextGroup() {
Preconditions.checkArgument(this.offset < this.end,
"Reading past RLE/BitPacking stream. offset=" + this.offset + " end=" + this.end);
int header = readUnsignedVarInt();
this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
switch (mode) {
......@@ -576,14 +594,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
case PACKED:
int numGroups = header >>> 1;
this.currentCount = numGroups * 8;
int bytesToRead = ceil8(this.currentCount * this.bitWidth);
if (this.currentBuffer.length < this.currentCount) {
this.currentBuffer = new int[this.currentCount];
}
currentBufferIdx = 0;
int bytesToRead = (int)Math.ceil((double)(this.currentCount * this.bitWidth) / 8.0D);
bytesToRead = Math.min(bytesToRead, this.end - this.offset);
int valueIndex = 0;
for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) {
this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex);
......
......@@ -150,21 +150,21 @@ object ParquetReadBenchmark {
/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
SQL Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
-------------------------------------------------------------------------------
SQL Parquet Reader 1350.56 11.65 1.00 X
SQL Parquet MR 1844.09 8.53 0.73 X
SQL Parquet Vectorized 1062.04 14.81 1.27 X
SQL Single Int Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
SQL Parquet Reader 1042 / 1208 15.1 66.2 1.0X
SQL Parquet MR 1544 / 1607 10.2 98.2 0.7X
SQL Parquet Vectorized 674 / 739 23.3 42.9 1.5X
*/
sqlBenchmark.run()
/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
Parquet Reader Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
-------------------------------------------------------------------------------
ParquetReader 610.40 25.77 1.00 X
ParquetReader(Batched) 172.66 91.10 3.54 X
ParquetReader(Batch -> Row) 192.28 81.80 3.17 X
Parquet Reader Single Int Column Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
ParquetReader 565 / 609 27.8 35.9 1.0X
ParquetReader(Batched) 165 / 174 95.3 10.5 3.4X
ParquetReader(Batch -> Row) 158 / 188 99.3 10.1 3.6X
*/
parquetReaderBenchmark.run()
}
......@@ -218,12 +218,12 @@ object ParquetReadBenchmark {
/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
Int and String Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------
SQL Parquet Reader 1737.94 6.03 1.00 X
SQL Parquet MR 2393.08 4.38 0.73 X
SQL Parquet Vectorized 1442.99 7.27 1.20 X
ParquetReader 1032.11 10.16 1.68 X
SQL Parquet Reader 1381 / 1679 7.6 131.7 1.0X
SQL Parquet MR 2005 / 2177 5.2 191.2 0.7X
SQL Parquet Vectorized 919 / 1044 11.4 87.6 1.5X
ParquetReader 1035 / 1163 10.1 98.7 1.3X
*/
benchmark.run()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment