-
- Downloads
[SPARK-3294][SQL] Eliminates boxing costs from in-memory columnar storage
This is a major refactoring of the in-memory columnar storage implementation, aims to eliminate boxing costs from critical paths (building/accessing column buffers) as much as possible. The basic idea is to refactor all major interfaces into a row-based form and use them together with `SpecificMutableRow`. The difficult part is how to adapt all compression schemes, esp. `RunLengthEncoding` and `DictionaryEncoding`, to this design. Since in-memory compression is disabled by default for now, and this PR should be strictly better than before no matter in-memory compression is enabled or not, maybe I'll finish that part in another PR. **UPDATE** This PR also took the chance to optimize `HiveTableScan` by 1. leveraging `SpecificMutableRow` to avoid boxing cost, and 1. building specific `Writable` unwrapper functions a head of time to avoid per row pattern matching and branching costs. TODO - [x] Benchmark - [ ] ~~Eliminate boxing costs in `RunLengthEncoding`~~ (left to future PRs) - [ ] ~~Eliminate boxing costs in `DictionaryEncoding` (seems not easy to do without specializing `DictionaryEncoding` for every supported column type)~~ (left to future PRs) ## Micro benchmark The benchmark uses a 10 million line CSV table consists of bytes, shorts, integers, longs, floats and doubles, measures the time to build the in-memory version of this table, and the time to scan the whole in-memory table. Benchmark code can be found [here](https://gist.github.com/liancheng/fe70a148de82e77bd2c8#file-hivetablescanbenchmark-scala). Script used to generate the input table can be found [here](https://gist.github.com/liancheng/fe70a148de82e77bd2c8#file-tablegen-scala). Speedup: - Hive table scanning + column buffer building: **18.74%** The original benchmark uses 1K as in-memory batch size, when increased to 10K, it can be 28.32% faster. - In-memory table scanning: **7.95%** Before: | Building | Scanning ------- | -------- | -------- 1 | 16472 | 525 2 | 16168 | 530 3 | 16386 | 529 4 | 16184 | 538 5 | 16209 | 521 Average | 16283.8 | 528.6 After: | Building | Scanning ------- | -------- | -------- 1 | 13124 | 458 2 | 13260 | 529 3 | 12981 | 463 4 | 13214 | 483 5 | 13583 | 500 Average | 13232.4 | 486.6 Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2327 from liancheng/prevent-boxing/unboxing and squashes the following commits: 4419fe4 [Cheng Lian] Addressing comments e5d2cf2 [Cheng Lian] Bug fix: should call setNullAt when field value is null to avoid NPE 8b8552b [Cheng Lian] Only checks for partition batch pruning flag once 489f97b [Cheng Lian] Bug fix: TableReader.fillObject uses wrong ordinals 97bbc4e [Cheng Lian] Optimizes hive.TableReader by by providing specific Writable unwrappers a head of time 3dc1f94 [Cheng Lian] Minor changes to eliminate row object creation 5b39cb9 [Cheng Lian] Lowers log level of compression scheme details f2a7890 [Cheng Lian] Use SpecificMutableRow in InMemoryColumnarTableScan to avoid boxing 9cf30b0 [Cheng Lian] Added row based ColumnType.append/extract 456c366 [Cheng Lian] Made compression decoder row based edac3cd [Cheng Lian] Makes ColumnAccessor.extractSingle row based 8216936 [Cheng Lian] Removes boxing cost in IntDelta and LongDelta by providing specialized implementations b70d519 [Cheng Lian] Made some in-memory columnar storage interfaces row-based
Showing
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificRow.scala 1 addition, 1 deletion...g/apache/spark/sql/catalyst/expressions/SpecificRow.scala
- sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala 5 additions, 3 deletions.../scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
- sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala 13 additions, 14 deletions...n/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
- sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala 8 additions, 8 deletions...ain/scala/org/apache/spark/sql/columnar/ColumnStats.scala
- sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala 147 additions, 31 deletions...main/scala/org/apache/spark/sql/columnar/ColumnType.scala
- sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala 55 additions, 37 deletions...apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
- sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala 2 additions, 2 deletions...rg/apache/spark/sql/columnar/NullableColumnAccessor.scala
- sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala 6 additions, 2 deletions...org/apache/spark/sql/columnar/NullableColumnBuilder.scala
- sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala 4 additions, 3 deletions...sql/columnar/compression/CompressibleColumnAccessor.scala
- sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala 13 additions, 11 deletions.../sql/columnar/compression/CompressibleColumnBuilder.scala
- sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala 11 additions, 5 deletions...he/spark/sql/columnar/compression/CompressionScheme.scala
- sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala 160 additions, 104 deletions...e/spark/sql/columnar/compression/compressionSchemes.scala
- sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala 1 addition, 1 deletion...cala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala 7 additions, 4 deletions...scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala 1 addition, 1 deletion...pache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala 3 additions, 1 deletion...ache/spark/sql/columnar/NullableColumnAccessorSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala 3 additions, 1 deletion...pache/spark/sql/columnar/NullableColumnBuilderSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala 3 additions, 3 deletions...pache/spark/sql/columnar/PartitionBatchPruningSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala 6 additions, 1 deletion...e/spark/sql/columnar/compression/BooleanBitSetSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala 7 additions, 2 deletions...rk/sql/columnar/compression/DictionaryEncodingSuite.scala
Loading
Please register or sign in to comment